This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
commit 9c684114337241bb7652619c11b7374b27204dcb Author: Jurian Broertjes <[email protected]> AuthorDate: Tue Dec 12 15:52:59 2017 +0000 fix for NUTCH-2477 (refactor checker classes) contributed by Jurian Broertjes --- .../nutch/indexer/IndexingFiltersChecker.java | 125 ++------------- .../org/apache/nutch/net/URLFilterChecker.java | 126 +++++---------- src/java/org/apache/nutch/net/URLFilters.java | 4 + .../org/apache/nutch/net/URLNormalizerChecker.java | 102 +++++------- .../org/apache/nutch/util/AbstractChecker.java | 171 +++++++++++++++++++++ 5 files changed, 269 insertions(+), 259 deletions(-) diff --git a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java index 05caf5a..5491638 100644 --- a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java +++ b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java @@ -17,23 +17,14 @@ package org.apache.nutch.indexer; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.PrintWriter; import java.lang.invoke.MethodHandles; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Inlinks; @@ -52,6 +43,7 @@ import org.apache.nutch.protocol.ProtocolOutput; import org.apache.nutch.scoring.ScoringFilters; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.AbstractChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,41 +57,33 @@ import org.slf4j.LoggerFactory; * @author Julien Nioche **/ -public class IndexingFiltersChecker extends Configured implements Tool { +public class IndexingFiltersChecker extends AbstractChecker { protected URLNormalizers normalizers = null; protected boolean dumpText = false; protected boolean followRedirects = false; - protected boolean keepClientCnxOpen = false; // used to simulate the metadata propagated from injection protected HashMap<String, String> metadata = new HashMap<>(); - protected int tcpPort = -1; private static final Logger LOG = LoggerFactory .getLogger(MethodHandles.lookup().lookupClass()); - public IndexingFiltersChecker() { - - } - public int run(String[] args) throws Exception { String url = null; - String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] [-keepClientCnxOpen]"; + usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] (-stdin | -listen <port> [-keepClientCnxOpen])"; - if (args.length == 0) { + // Print help when no args given + if (args.length < 1) { System.err.println(usage); - return -1; + System.exit(-1); } + int numConsumed; for (int i = 0; i < args.length; i++) { if (args[i].equals("-normalize")) { normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT); - } else if (args[i].equals("-listen")) { - tcpPort = Integer.parseInt(args[++i]); } else if (args[i].equals("-followRedirects")) { followRedirects = true; - } else if (args[i].equals("-keepClientCnxOpen")) { - keepClientCnxOpen = true; } else if (args[i].equals("-dumpText")) { dumpText = true; } else if (args[i].equals("-md")) { @@ -112,104 +96,27 @@ public class IndexingFiltersChecker extends Configured implements Tool { } else k = nextOne; metadata.put(k, v); + } else if ((numConsumed = super.parseArgs(args, i)) > 0) { + i += numConsumed - 1; } else if (i != args.length - 1) { + System.err.println("ERR: Not a recognized argument: " + args[i]); System.err.println(usage); System.exit(-1); } else { - url =args[i]; + url = args[i]; } } - // In listening mode? - if (tcpPort == -1) { - // No, just fetch and display - StringBuilder output = new StringBuilder(); - int ret = fetch(url, output); - System.out.println(output); - return ret; + if (url != null) { + return super.processSingle(url); } else { - // Listen on socket and start workers on incoming requests - listen(); - } - - return 0; - } - - protected void listen() throws Exception { - ServerSocket server = null; - - try{ - server = new ServerSocket(); - server.bind(new InetSocketAddress(tcpPort)); - LOG.info(server.toString()); - } catch (Exception e) { - LOG.error("Could not listen on port " + tcpPort); - System.exit(-1); - } - - while(true){ - Worker worker; - try{ - worker = new Worker(server.accept()); - Thread thread = new Thread(worker); - thread.start(); - } catch (Exception e) { - LOG.error("Accept failed: " + tcpPort); - System.exit(-1); - } - } - } - - private class Worker implements Runnable { - private Socket client; - - Worker(Socket client) { - this.client = client; - LOG.info(client.toString()); - } - - public void run() { - if (keepClientCnxOpen) { - while (true) { // keep connection open until closes - readWrite(); - } - } else { - readWrite(); - - try { // close ourselves - client.close(); - } catch (Exception e){ - LOG.error(e.toString()); - } - } - } - - protected void readWrite() { - String line; - BufferedReader in = null; - PrintWriter out = null; - - try{ - in = new BufferedReader(new InputStreamReader(client.getInputStream())); - } catch (Exception e) { - LOG.error("in or out failed"); - System.exit(-1); - } - - try{ - line = in.readLine(); - StringBuilder output = new StringBuilder(); - fetch(line, output); - - client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8"))); - }catch (Exception e) { - LOG.error("Read/Write failed: " + e); - } + // Start listening + return super.run(); } } - protected int fetch(String url, StringBuilder output) throws Exception { + protected int process(String url, StringBuilder output) throws Exception { if (normalizers != null) { url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); } diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java index 6fb3cf2..429aa9f 100644 --- a/src/java/org/apache/nutch/net/URLFilterChecker.java +++ b/src/java/org/apache/nutch/net/URLFilterChecker.java @@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension; import org.apache.nutch.plugin.ExtensionPoint; import org.apache.nutch.plugin.PluginRepository; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.AbstractChecker; import org.apache.nutch.util.NutchConfiguration; import java.io.BufferedReader; @@ -33,103 +34,60 @@ import java.io.InputStreamReader; * * @author John Xing */ -public class URLFilterChecker { +public class URLFilterChecker extends AbstractChecker { - private Configuration conf; + private URLFilters filters = null; - public URLFilterChecker(Configuration conf) { - this.conf = conf; - } - - private void checkOne(String filterName) throws Exception { - URLFilter filter = null; - - ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( - URLFilter.X_POINT_ID); - - if (point == null) - throw new RuntimeException(URLFilter.X_POINT_ID + " not found."); - - Extension[] extensions = point.getExtensions(); - - for (int i = 0; i < extensions.length; i++) { - Extension extension = extensions[i]; - filter = (URLFilter) extension.getExtensionInstance(); - if (filter.getClass().getName().equals(filterName)) { - break; - } else { - filter = null; - } - } - - if (filter == null) - throw new RuntimeException("Filter " + filterName + " not found."); - - // jerome : should we keep this behavior? - // if (LogFormatter.hasLoggedSevere()) - // throw new RuntimeException("Severe error encountered."); - - System.out.println("Checking URLFilter " + filterName); - - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String line; - while ((line = in.readLine()) != null) { - String out = filter.filter(line); - if (out != null) { - System.out.print("+"); - System.out.println(out); - } else { - System.out.print("-"); - System.out.println(line); - } - } - } + public int run(String[] args) throws Exception { + usage = "Usage: URLFilterChecker [-filterName filterName] (-stdin | -listen <port> [-keepClientCnxOpen]) \n" + + "\n\tTool takes a list of URLs, one per line.\n"; - private void checkAll() throws Exception { - System.out.println("Checking combination of all URLFilters available"); - - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String line; - URLFilters filters = new URLFilters(this.conf); - - while ((line = in.readLine()) != null) { - String out = filters.filter(line); - if (out != null) { - System.out.print("+"); - System.out.println(out); - } else { - System.out.print("-"); - System.out.println(line); - } - } - } - - public static void main(String[] args) throws Exception { - - String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n" - + "Tool takes a list of URLs, one per line, passed via STDIN.\n"; - - if (args.length == 0) { + // Print help when no args given + if (args.length < 1) { System.err.println(usage); System.exit(-1); } - String filterName = null; - if (args[0].equals("-filterName")) { - if (args.length != 2) { + int numConsumed; + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-filterName")) { + getConf().set("plugin.includes", args[++i]); + } else if ((numConsumed = super.parseArgs(args, i)) > 0) { + i += numConsumed - 1; + } else { + System.err.println("ERR: Not a recognized argument: " + args[i]); System.err.println(usage); System.exit(-1); } - filterName = args[1]; } - URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create()); - if (filterName != null) { - checker.checkOne(filterName); + // Print active filter list + filters = new URLFilters(getConf()); + System.out.print("Checking combination of these URLFilters: "); + for (URLFilter filter : filters.getFilters()) { + System.out.print(filter.getClass().getSimpleName() + " "); + } + System.out.println(""); + + // Start listening + return super.run(); + } + + protected int process(String line, StringBuilder output) throws Exception { + String out = filters.filter(line); + if (out != null) { + output.append("+"); + output.append(out); } else { - checker.checkAll(); + output.append("-"); + output.append(line); } + return 0; + } - System.exit(0); + public static void main(String[] args) throws Exception { + final int res = ToolRunner.run(NutchConfiguration.create(), + new URLFilterChecker(), args); + System.exit(res); } } diff --git a/src/java/org/apache/nutch/net/URLFilters.java b/src/java/org/apache/nutch/net/URLFilters.java index 3deccca..4f5bf36 100644 --- a/src/java/org/apache/nutch/net/URLFilters.java +++ b/src/java/org/apache/nutch/net/URLFilters.java @@ -31,6 +31,10 @@ public class URLFilters { URLFilter.class, URLFilter.X_POINT_ID, URLFILTER_ORDER); } + public URLFilter[] getFilters() { + return this.filters; + } + /** Run all defined filters. Assume logical AND. */ public String filter(String urlString) throws URLFilterException { for (int i = 0; i < this.filters.length; i++) { diff --git a/src/java/org/apache/nutch/net/URLNormalizerChecker.java b/src/java/org/apache/nutch/net/URLNormalizerChecker.java index d8f1c6e..a435cc8 100644 --- a/src/java/org/apache/nutch/net/URLNormalizerChecker.java +++ b/src/java/org/apache/nutch/net/URLNormalizerChecker.java @@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension; import org.apache.nutch.plugin.ExtensionPoint; import org.apache.nutch.plugin.PluginRepository; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.AbstractChecker; import org.apache.nutch.util.NutchConfiguration; import java.io.BufferedReader; @@ -31,87 +32,56 @@ import java.io.InputStreamReader; /** * Checks one given normalizer or all normalizers. */ -public class URLNormalizerChecker { +public class URLNormalizerChecker extends AbstractChecker { - private Configuration conf; + private String scope = URLNormalizers.SCOPE_DEFAULT; + URLNormalizers normalizers; - public URLNormalizerChecker(Configuration conf) { - this.conf = conf; - } - - private void checkOne(String normalizerName, String scope) throws Exception { - URLNormalizer normalizer = null; - - ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( - URLNormalizer.X_POINT_ID); - - if (point == null) - throw new RuntimeException(URLNormalizer.X_POINT_ID + " not found."); - - Extension[] extensions = point.getExtensions(); - - for (int i = 0; i < extensions.length; i++) { - Extension extension = extensions[i]; - normalizer = (URLNormalizer) extension.getExtensionInstance(); - if (normalizer.getClass().getName().equals(normalizerName)) { - break; - } else { - normalizer = null; - } - } - - if (normalizer == null) - throw new RuntimeException("URLNormalizer " + normalizerName - + " not found."); - - System.out.println("Checking URLNormalizer " + normalizerName); - - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String line; - while ((line = in.readLine()) != null) { - String out = normalizer.normalize(line, scope); - System.out.println(out); - } - } - - private void checkAll(String scope) throws Exception { - System.out.println("Checking combination of all URLNormalizers available"); + public int run(String[] args) throws Exception { + usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>] (-stdin | -listen <port> [-keepClientCnxOpen])" + + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink\n"; - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String line; - URLNormalizers normalizers = new URLNormalizers(conf, scope); - while ((line = in.readLine()) != null) { - String out = normalizers.normalize(line, scope); - System.out.println(out); + // Print help when no args given + if (args.length < 1) { + System.err.println(usage); + System.exit(-1); } - } - - public static void main(String[] args) throws Exception { - String usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>]" - + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink"; - - String normalizerName = null; - String scope = URLNormalizers.SCOPE_DEFAULT; + int numConsumed; for (int i = 0; i < args.length; i++) { if (args[i].equals("-normalizer")) { - normalizerName = args[++i]; + getConf().set("plugin.includes", args[++i]); } else if (args[i].equals("-scope")) { scope = args[++i]; + } else if ((numConsumed = super.parseArgs(args, i)) > 0) { + i += numConsumed - 1; } else { + System.err.println("ERR: Not a recognized argument: " + args[i]); System.err.println(usage); System.exit(-1); } } - URLNormalizerChecker checker = new URLNormalizerChecker( - NutchConfiguration.create()); - if (normalizerName != null) { - checker.checkOne(normalizerName, scope); - } else { - checker.checkAll(scope); + // Print active normalizer list + normalizers = new URLNormalizers(getConf(), scope); + System.out.print("Checking combination of these URLNormalizers: "); + for (URLNormalizer normalizer : normalizers.getURLNormalizers(scope)) { + System.out.print(normalizer.getClass().getSimpleName() + " "); } + System.out.println(""); + + // Start listening + return super.run(); + } - System.exit(0); + protected int process(String line, StringBuilder output) throws Exception { + output.append(normalizers.normalize(line, scope)); + return 0; + } + + public static void main(String[] args) throws Exception { + final int res = ToolRunner.run(NutchConfiguration.create(), + new URLNormalizerChecker(), args); + System.exit(res); } } diff --git a/src/java/org/apache/nutch/util/AbstractChecker.java b/src/java/org/apache/nutch/util/AbstractChecker.java new file mode 100644 index 0000000..8424879 --- /dev/null +++ b/src/java/org/apache/nutch/util/AbstractChecker.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.lang.invoke.MethodHandles; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scaffolding class for the various Checker implementations. Can process cmdline input, stdin and TCP connections. + * + * @author Jurian Broertjes + */ +public abstract class AbstractChecker extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected boolean keepClientCnxOpen = false; + protected int tcpPort = -1; + protected boolean stdin = true; + protected String usage; + + // Actual function for the processing of a single input + protected abstract int process(String line, StringBuilder output) throws Exception; + + protected int parseArgs(String[] args, int i) { + if (args[i].equals("-listen")) { + tcpPort = Integer.parseInt(args[++i]); + return 2; + } else if (args[i].equals("-keepClientCnxOpen")) { + keepClientCnxOpen = true; + return 1; + } else if (args[i].equals("-stdin")) { + stdin = true; + return 1; + } + return 0; + } + + protected int run() throws Exception { + // In listening mode? + if (tcpPort != -1) { + processTCP(tcpPort); + return 0; + } else if (stdin) { + return processStdin(); + } + // Nothing to do? + return -1; + } + + // Process single input and return + protected int processSingle(String input) throws Exception { + StringBuilder output = new StringBuilder(); + int ret = process(input, output); + System.out.println(output); + return ret; + } + + // Read from stdin + protected int processStdin() throws Exception { + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String line; + while ((line = in.readLine()) != null) { + StringBuilder output = new StringBuilder(); + int ret = process(line, output); + System.out.println(output); + } + return 0; + } + + // Open TCP socket and process input + protected void processTCP(int tcpPort) throws Exception { + ServerSocket server = null; + + try { + server = new ServerSocket(); + server.bind(new InetSocketAddress(tcpPort)); + LOG.info(server.toString()); + } catch (Exception e) { + LOG.error("Could not listen on port " + tcpPort); + System.exit(-1); + } + + while(true){ + Worker worker; + try { + worker = new Worker(server.accept()); + Thread thread = new Thread(worker); + thread.start(); + } catch (Exception e) { + LOG.error("Accept failed: " + tcpPort); + System.exit(-1); + } + } + } + + private class Worker implements Runnable { + private Socket client; + + Worker(Socket client) { + this.client = client; + LOG.info(client.toString()); + } + + public void run() { + if (keepClientCnxOpen) { + while (true) { // keep connection open until closes + readWrite(); + } + } else { + readWrite(); + + try { // close ourselves + client.close(); + } catch (Exception e){ + LOG.error(e.toString()); + } + } + } + + protected void readWrite() { + String line; + BufferedReader in = null; + PrintWriter out = null; + + try { + in = new BufferedReader(new InputStreamReader(client.getInputStream())); + } catch (Exception e) { + LOG.error("in or out failed"); + System.exit(-1); + } + + try { + line = in.readLine(); + StringBuilder output = new StringBuilder(); + process(line, output); + + client.getOutputStream().write(output.toString().getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOG.error("Read/Write failed: " + e); + } + } + } +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
