This is an automated email from the ASF dual-hosted git repository. markus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 64a6f7c NUTCH-2678 Allow for per-host configurable protocol plugin 64a6f7c is described below commit 64a6f7cc533f41e53cf6b637979080235b2147fa Author: Markus Jelsma <mar...@apache.org> AuthorDate: Fri Jan 18 13:27:16 2019 +0100 NUTCH-2678 Allow for per-host configurable protocol plugin --- conf/host-protocol-mapping.txt.template | 16 +++ src/java/org/apache/nutch/plugin/Extension.java | 4 + .../org/apache/nutch/protocol/ProtocolFactory.java | 129 +++++++++++++++++---- src/java/org/apache/nutch/util/ObjectCache.java | 4 + .../apache/nutch/protocol/TestProtocolFactory.java | 6 - 5 files changed, 130 insertions(+), 29 deletions(-) diff --git a/conf/host-protocol-mapping.txt.template b/conf/host-protocol-mapping.txt.template new file mode 100644 index 0000000..a09bca6 --- /dev/null +++ b/conf/host-protocol-mapping.txt.template @@ -0,0 +1,16 @@ +# This file defines a hostname to protocol plugin mapping. Each line takes a +# host name followed by a tab, followed by the ID of the protocol plugin. You +# can find the ID in the protocol plugin's plugin.xml file. +# +# <hostname>\t<plugin_id>\n +# nutch.apache.org org.apache.nutch.protocol.httpclient.Http +# tika.apache.org org.apache.nutch.protocol.http.Http +# +# If the requested host is not mapped, Nutch can choose any of the enabled +# plugins so you can force defaults using: +# +# protocol:<protocol>\t<plugin_id>\n +# +# This example forces httpclient for all protocol in case the host is not mapped: +# protocol:http org.apache.nutch.protocol.httpclient.Http +# protocol:https org.apache.nutch.protocol.httpclient.Http diff --git a/src/java/org/apache/nutch/plugin/Extension.java b/src/java/org/apache/nutch/plugin/Extension.java index e73b850..be737cb 100644 --- a/src/java/org/apache/nutch/plugin/Extension.java +++ b/src/java/org/apache/nutch/plugin/Extension.java @@ -197,4 +197,8 @@ public class Extension { public void setDescriptor(PluginDescriptor pDescriptor) { fDescriptor = pDescriptor; } + + public String toString() { + return getId() + ", " + getClazz() + ", " + getTargetPoint(); + } } diff --git a/src/java/org/apache/nutch/protocol/ProtocolFactory.java b/src/java/org/apache/nutch/protocol/ProtocolFactory.java index 2d20ecd..7f900b2 100644 --- a/src/java/org/apache/nutch/protocol/ProtocolFactory.java +++ b/src/java/org/apache/nutch/protocol/ProtocolFactory.java @@ -17,8 +17,13 @@ package org.apache.nutch.protocol; +import java.io.BufferedReader; +import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.net.URL; import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.Map; import org.apache.nutch.plugin.Extension; import org.apache.nutch.plugin.ExtensionPoint; @@ -26,8 +31,13 @@ import org.apache.nutch.plugin.PluginRepository; import org.apache.nutch.plugin.PluginRuntimeException; import org.apache.nutch.util.ObjectCache; +import org.apache.commons.lang.StringUtils; + import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Creates and caches {@link Protocol} plugins. Protocol plugins should define * the attribute "protocolName" with the name of the protocol that they @@ -37,10 +47,16 @@ import org.apache.hadoop.conf.Configuration; */ public class ProtocolFactory { + private static final Logger LOG = LoggerFactory + .getLogger(MethodHandles.lookup().lookupClass()); + private ExtensionPoint extensionPoint; private Configuration conf; + protected Map<String, String> defaultProtocolImplMapping = new HashMap<>(); + protected Map<String, String> hostProtocolMapping = new HashMap<>(); + public ProtocolFactory(Configuration conf) { this.conf = conf; this.extensionPoint = PluginRepository.get(conf).getExtensionPoint( @@ -49,8 +65,35 @@ public class ProtocolFactory { throw new RuntimeException("x-point " + Protocol.X_POINT_ID + " not found."); } - } + try { + BufferedReader reader = new BufferedReader(conf.getConfResourceAsReader("host-protocol-mapping.txt")); + String line; + String parts[]; + while ((line = reader.readLine()) != null) { + if (StringUtils.isNotBlank(line) && !line.startsWith("#")) { + line = line.trim(); + parts = line.split("\t"); + + // Must be at least two parts + if (parts.length == 2) { + // Is this a host to plugin mapping, or a default? + if (parts[0].indexOf(":") == -1) { + hostProtocolMapping.put(parts[0].trim(), parts[1].trim()); + } else { + String[] moreParts = parts[0].split(":"); + defaultProtocolImplMapping.put(moreParts[1].trim(), parts[1].trim()); + } + } else { + LOG.warn("Wrong format of line: {}", line); + LOG.warn("Expected format: <hostname> <tab> <plugin_id> or protocol:<protocol> <tab> <plugin_id>"); + } + } + } + } catch (IOException e) { + LOG.error("Unable to read host-protocol-mapping.txt", e); + } + } /** * Returns the appropriate {@link Protocol} implementation for a url. * @@ -83,52 +126,92 @@ public class ProtocolFactory { */ public Protocol getProtocol(URL url) throws ProtocolNotFound { - ObjectCache objectCache = ObjectCache.get(conf); try { - String protocolName = url.getProtocol(); - if (protocolName == null) { - throw new ProtocolNotFound(url.toString()); + Protocol protocol = null; + + // First attempt to resolve a protocol implementation by hostname + String host = url.getHost(); + if (hostProtocolMapping.containsKey(host)) { + Extension extension = getExtensionById(hostProtocolMapping.get(host)); + if (extension != null) { + protocol = getProtocolInstanceByExtension(extension); + } } - String cacheId = Protocol.X_POINT_ID + protocolName; - synchronized (objectCache) { - Protocol protocol = (Protocol) objectCache.getObject(cacheId); - if (protocol != null) { - return protocol; + // Nothing, see if we have defaults configured + if (protocol == null) { + // Protocol listed in default map? + if (defaultProtocolImplMapping.containsKey(url.getProtocol())) { + Extension extension = getExtensionById(defaultProtocolImplMapping.get(url.getProtocol())); + if (extension != null) { + protocol = getProtocolInstanceByExtension(extension); + } } + } - Extension extension = findExtension(protocolName); - if (extension == null) { - throw new ProtocolNotFound(protocolName); + // Still couldn't find a protocol? Attempt by protocol + if (protocol == null) { + Extension extension = findExtension(url.getProtocol(), "protocolName"); + if (extension != null) { + protocol = getProtocolInstanceByExtension(extension); } + } - protocol = (Protocol) extension.getExtensionInstance(); - objectCache.setObject(cacheId, protocol); + // Got anything? + if (protocol != null) { return protocol; } + + // Nothing! + throw new ProtocolNotFound(url.toString()); } catch (PluginRuntimeException e) { throw new ProtocolNotFound(url.toString(), e.toString()); } } - private Extension findExtension(String name) throws PluginRuntimeException { + private Protocol getProtocolInstanceByExtension(Extension extension) throws PluginRuntimeException { + Protocol protocol = null; + String cacheId = extension.getId(); + ObjectCache objectCache = ObjectCache.get(conf); + synchronized (objectCache) { + if (!objectCache.hasObject(cacheId)) { + protocol = (Protocol) extension.getExtensionInstance(); + objectCache.setObject(cacheId, protocol); + } + protocol = (Protocol) objectCache.getObject(cacheId); + } - Extension[] extensions = this.extensionPoint.getExtensions(); + return protocol; + } + private Extension getExtensionById(String id) { + Extension[] extensions = this.extensionPoint.getExtensions(); for (int i = 0; i < extensions.length; i++) { - Extension extension = extensions[i]; + if (id.equals(extensions[i].getId())) { + return extensions[i]; + } + } + + return null; + } - if (contains(name, extension.getAttribute("protocolName"))) + private Extension findExtension(String name, String attribute) throws PluginRuntimeException { + for (int i = 0; i < this.extensionPoint.getExtensions().length; i++) { + Extension extension = this.extensionPoint.getExtensions()[i]; + + if (contains(name, extension.getAttribute(attribute))) return extension; } return null; } boolean contains(String what, String where) { - String parts[] = where.split("[, ]"); - for (int i = 0; i < parts.length; i++) { - if (parts[i].equals(what)) - return true; + if (where != null) { + String parts[] = where.split("[, ]"); + for (int i = 0; i < parts.length; i++) { + if (parts[i].equals(what)) + return true; + } } return false; } diff --git a/src/java/org/apache/nutch/util/ObjectCache.java b/src/java/org/apache/nutch/util/ObjectCache.java index 4ed3fd0..f1b14c8 100644 --- a/src/java/org/apache/nutch/util/ObjectCache.java +++ b/src/java/org/apache/nutch/util/ObjectCache.java @@ -52,6 +52,10 @@ public class ObjectCache { return objectMap.get(key); } + public boolean hasObject(String key) { + return objectMap.containsKey(key); + } + public synchronized void setObject(String key, Object value) { objectMap.put(key, value); } diff --git a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java index 394c303..7cab623 100644 --- a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java +++ b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java @@ -59,12 +59,6 @@ public class TestProtocolFactory { Assert.fail("Must not throw any other exception"); } - // cache key - Object protocol = ObjectCache.get(conf).getObject( - Protocol.X_POINT_ID + "http"); - Assert.assertNotNull(protocol); - Assert.assertEquals(httpProtocol, protocol); - // test same object instance try { Assert.assertTrue(httpProtocol == factory.getProtocol("http://somehost"));