Merge branch 'master' into NUTCH-2293
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/2175c767 Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/2175c767 Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/2175c767 Branch: refs/heads/NUTCH-2292 Commit: 2175c767c64f92b77ebe497750241e4740acea9b Parents: 0bf453e 80afa31 Author: Thamme Gowda <[email protected]> Authored: Sat Jul 16 14:32:19 2016 -0700 Committer: Thamme Gowda <[email protected]> Committed: Sat Jul 16 14:32:19 2016 -0700 ---------------------------------------------------------------------- build.xml | 1 + conf/nutch-default.xml | 25 +- nutch-plugins/build.xml | 1 + nutch-plugins/indexer-elastic/build.xml | 13 + nutch-plugins/indexer-elastic/plugin.xml | 5 +- .../indexwriter/elastic/ElasticConstants.java | 5 +- .../indexwriter/elastic/ElasticIndexWriter.java | 236 +++++++++---------- .../nutch/indexwriter/solr/SolrUtils.java | 8 +- .../src/test/conf/nutch-site-test.xml | 57 +++++ .../elastic/TestElasticIndexWriter.java | 221 +++++++++++++++++ 10 files changed, 436 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/build.xml ---------------------------------------------------------------------- diff --cc nutch-plugins/build.xml index 75ae2e7,0000000..20ef870 mode 100755,000000..100755 --- a/nutch-plugins/build.xml +++ b/nutch-plugins/build.xml @@@ -1,213 -1,0 +1,214 @@@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="Nutch" default="deploy-core" basedir="."> + + <target name="deploy-core"> + <ant target="compile-core" inheritall="false" dir="../.."/> + <ant target="deploy"/> + </target> + + <!-- ====================================================== --> + <!-- Build & deploy all the plugin jars. --> + <!-- ====================================================== --> + <target name="deploy"> + <ant dir="creativecommons" target="deploy"/> + <ant dir="feed" target="deploy"/> + <ant dir="headings" target="deploy"/> + <ant dir="index-basic" target="deploy"/> + <ant dir="index-anchor" target="deploy"/> + <ant dir="index-geoip" target="deploy"/> + <ant dir="index-more" target="deploy"/> + <ant dir="index-replace" target="deploy"/> + <ant dir="index-static" target="deploy"/> + <ant dir="index-metadata" target="deploy"/> + <ant dir="index-links" target="deploy"/> + <ant dir="mimetype-filter" target="deploy"/> + <ant dir="indexer-cloudsearch" target="deploy"/> + <ant dir="indexer-dummy" target="deploy"/> + <ant dir="indexer-elastic" target="deploy"/> + <ant dir="indexer-solr" target="deploy"/> + <ant dir="language-identifier" target="deploy"/> + <ant dir="lib-http" target="deploy"/> + <ant dir="lib-nekohtml" target="deploy"/> + <ant dir="lib-regex-filter" target="deploy"/> + <ant dir="lib-xml" target="deploy"/> + <ant dir="microformats-reltag" target="deploy"/> + <ant dir="nutch-extensionpoints" target="deploy"/> + <ant dir="protocol-file" target="deploy"/> + <ant dir="protocol-ftp" target="deploy"/> + <ant dir="protocol-http" target="deploy"/> + <ant dir="protocol-httpclient" target="deploy"/> + <ant dir="lib-htmlunit" target="deploy"/> + <ant dir="protocol-htmlunit" target="deploy" /> + <ant dir="lib-selenium" target="deploy"/> + <ant dir="protocol-selenium" target="deploy" /> + <ant dir="protocol-interactiveselenium" target="deploy" /> + <ant dir="parse-ext" target="deploy"/> + <ant dir="parse-js" target="deploy"/> + <ant dir="parse-html" target="deploy"/> + <ant dir="parse-metatags" target="deploy"/> + <ant dir="parse-swf" target="deploy"/> + <ant dir="parse-tika" target="deploy"/> + <ant dir="parse-zip" target="deploy"/> + <ant dir="scoring-depth" target="deploy"/> + <ant dir="scoring-opic" target="deploy"/> + <ant dir="scoring-link" target="deploy"/> + <ant dir="scoring-similarity" target="deploy"/> + <ant dir="subcollection" target="deploy"/> + <ant dir="tld" target="deploy"/> + <ant dir="urlfilter-automaton" target="deploy"/> + <ant dir="urlfilter-domain" target="deploy" /> + <ant dir="urlfilter-domainblacklist" target="deploy" /> + <ant dir="urlfilter-prefix" target="deploy"/> + <ant dir="urlfilter-regex" target="deploy"/> + <ant dir="urlfilter-suffix" target="deploy"/> + <ant dir="urlfilter-validator" target="deploy"/> + <ant dir="urlfilter-ignoreexempt" target="deploy"/> + <ant dir="parsefilter-naivebayes" target="deploy"/> + <ant dir="parsefilter-regex" target="deploy"/> + <ant dir="urlmeta" target="deploy"/> + <ant dir="urlnormalizer-ajax" target="deploy"/> + <ant dir="urlnormalizer-basic" target="deploy"/> + <ant dir="urlnormalizer-host" target="deploy"/> + <ant dir="urlnormalizer-pass" target="deploy"/> + <ant dir="urlnormalizer-protocol" target="deploy"/> + <ant dir="urlnormalizer-querystring" target="deploy"/> + <ant dir="urlnormalizer-regex" target="deploy"/> + <ant dir="urlnormalizer-slash" target="deploy"/> + </target> + + <!-- ====================================================== --> + <!-- Test all of the plugins. --> + <!-- ====================================================== --> + <target name="test"> + <parallel threadCount="2"> + <ant dir="creativecommons" target="test"/> + <ant dir="index-basic" target="test"/> + <ant dir="index-anchor" target="test"/> + <ant dir="index-geoip" target="test"/> + <ant dir="index-more" target="test"/> + <ant dir="index-static" target="test"/> + <ant dir="index-replace" target="test"/> + <ant dir="index-links" target="test"/> + <ant dir="mimetype-filter" target="test"/> ++ <ant dir="indexer-elastic" target="test"/> + <ant dir="language-identifier" target="test"/> + <ant dir="lib-http" target="test"/> + <ant dir="protocol-file" target="test"/> + <ant dir="protocol-http" target="test"/> + <ant dir="protocol-httpclient" target="test"/> + <!--ant dir="parse-ext" target="test"/--> + <ant dir="feed" target="test"/> + <ant dir="parse-html" target="test"/> + <ant dir="parse-metatags" target="test"/> + <ant dir="parse-swf" target="test"/> + <ant dir="parse-tika" target="test"/> + <ant dir="parse-zip" target="test"/> + <ant dir="parsefilter-regex" target="test"/> + <ant dir="subcollection" target="test"/> + <ant dir="urlfilter-automaton" target="test"/> + <ant dir="urlfilter-domain" target="test"/> + <ant dir="urlfilter-domainblacklist" target="test"/> + <ant dir="urlfilter-prefix" target="test"/> + <ant dir="urlfilter-regex" target="test"/> + <ant dir="urlfilter-suffix" target="test"/> + <ant dir="urlfilter-validator" target="test"/> + <ant dir="urlfilter-ignoreexempt" target="test"/> + <ant dir="urlnormalizer-ajax" target="test"/> + <ant dir="urlnormalizer-basic" target="test"/> + <ant dir="urlnormalizer-host" target="test"/> + <ant dir="urlnormalizer-pass" target="test"/> + <ant dir="urlnormalizer-protocol" target="test"/> + <ant dir="urlnormalizer-querystring" target="test"/> + <ant dir="urlnormalizer-regex" target="test"/> + <ant dir="urlnormalizer-slash" target="test"/> + </parallel> + </target> + + <!-- ====================================================== --> + <!-- Clean all of the plugins. --> + <!-- ====================================================== --> + <target name="clean"> + <ant dir="creativecommons" target="clean"/> + <ant dir="feed" target="clean"/> + <ant dir="headings" target="clean"/> + <ant dir="index-basic" target="clean"/> + <ant dir="index-anchor" target="clean"/> + <ant dir="index-geoip" target="clean"/> + <ant dir="index-more" target="clean"/> + <ant dir="index-static" target="clean"/> + <ant dir="index-replace" target="clean"/> + <ant dir="index-metadata" target="clean"/> + <ant dir="index-links" target="clean"/> + <ant dir="mimetype-filter" target="clean"/> + <ant dir="indexer-cloudsearch" target="clean"/> + <ant dir="indexer-dummy" target="clean"/> + <ant dir="indexer-elastic" target="clean"/> + <ant dir="indexer-solr" target="clean"/> + <ant dir="language-identifier" target="clean"/> + <!-- <ant dir="lib-commons-httpclient" target="clean"/> --> + <ant dir="lib-http" target="clean"/> + <!-- <ant dir="lib-lucene-analyzers" target="clean"/>--> + <ant dir="lib-nekohtml" target="clean"/> + <ant dir="lib-regex-filter" target="clean"/> + <ant dir="lib-xml" target="clean"/> + <ant dir="microformats-reltag" target="clean"/> + <ant dir="nutch-extensionpoints" target="clean"/> + <ant dir="protocol-file" target="clean"/> + <ant dir="protocol-ftp" target="clean"/> + <ant dir="protocol-http" target="clean"/> + <ant dir="protocol-httpclient" target="clean"/> + <ant dir="lib-htmlunit" target="clean"/> + <ant dir="protocol-htmlunit" target="clean" /> + <ant dir="lib-selenium" target="clean"/> + <ant dir="protocol-selenium" target="clean" /> + <ant dir="protocol-interactiveselenium" target="clean" /> + <ant dir="parse-ext" target="clean"/> + <ant dir="parse-js" target="clean"/> + <ant dir="parse-html" target="clean"/> + <ant dir="parse-metatags" target="clean"/> + <ant dir="parse-swf" target="clean"/> + <ant dir="parse-tika" target="clean"/> + <ant dir="parse-zip" target="clean"/> + <ant dir="parsefilter-regex" target="clean"/> + <ant dir="scoring-depth" target="clean"/> + <ant dir="scoring-opic" target="clean"/> + <ant dir="scoring-link" target="clean"/> + <ant dir="scoring-similarity" target="clean"/> + <ant dir="subcollection" target="clean"/> + <ant dir="tld" target="clean"/> + <ant dir="urlfilter-automaton" target="clean"/> + <ant dir="urlfilter-domain" target="clean" /> + <ant dir="urlfilter-domainblacklist" target="clean" /> + <ant dir="urlfilter-prefix" target="clean"/> + <ant dir="urlfilter-regex" target="clean"/> + <ant dir="urlfilter-suffix" target="clean"/> + <ant dir="urlfilter-validator" target="clean"/> + <ant dir="urlfilter-ignoreexempt" target="clean"/> + <ant dir="parsefilter-naivebayes" target="clean" /> + <ant dir="urlmeta" target="clean"/> + <ant dir="urlnormalizer-ajax" target="clean"/> + <ant dir="urlnormalizer-basic" target="clean"/> + <ant dir="urlnormalizer-host" target="clean"/> + <ant dir="urlnormalizer-pass" target="clean"/> + <ant dir="urlnormalizer-protocol" target="clean"/> + <ant dir="urlnormalizer-querystring" target="clean"/> + <ant dir="urlnormalizer-regex" target="clean"/> + <ant dir="urlnormalizer-slash" target="clean"/> + </target> +</project> http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/build.xml ---------------------------------------------------------------------- diff --cc nutch-plugins/indexer-elastic/build.xml index 38955ff,0000000..6955f61 mode 100644,000000..100644 --- a/nutch-plugins/indexer-elastic/build.xml +++ b/nutch-plugins/indexer-elastic/build.xml @@@ -1,22 -1,0 +1,35 @@@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="indexer-elastic" default="jar-core"> + + <import file="../build-plugin.xml" /> + ++ <!-- Add compilation dependencies to classpath --> ++ <path id="plugin.deps"> ++ <pathelement location="${build.dir}/test/conf"/> ++ </path> ++ ++ <!-- Deploy Unit test dependencies --> ++ <target name="deps-test"> ++ <copy toDir="${build.test}"> ++ <fileset dir="${src.test}" excludes="**/*.java"/> ++ </copy> ++ </target> ++ ++ +</project> http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/plugin.xml ---------------------------------------------------------------------- diff --cc nutch-plugins/indexer-elastic/plugin.xml index d99a665,0000000..401e342 mode 100644,000000..100644 --- a/nutch-plugins/indexer-elastic/plugin.xml +++ b/nutch-plugins/indexer-elastic/plugin.xml @@@ -1,71 -1,0 +1,70 @@@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at - ++ + http://www.apache.org/licenses/LICENSE-2.0 - ++ + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0" + provider-name="nutch.apache.org"> + + <runtime> + <library name="indexer-elastic.jar"> + <export name="*" /> + </library> + <library name="elasticsearch-2.3.3.jar"/> + <library name="commons-cli-1.3.1.jar"/> + <library name="compress-lzf-1.0.2.jar"/> + <library name="guava-18.0.jar"/> + <library name="HdrHistogram-2.1.6.jar"/> + <library name="hppc-0.7.1.jar"/> - <library name="indexer-elastic.jar"/> + <library name="jackson-core-2.6.6.jar"/> + <library name="jackson-dataformat-cbor-2.6.6.jar"/> + <library name="jackson-dataformat-smile-2.6.6.jar"/> + <library name="jackson-dataformat-yaml-2.6.6.jar"/> + <library name="joda-convert-1.2.jar"/> + <library name="joda-time-2.8.2.jar"/> + <library name="jsr166e-1.1.0.jar"/> + <library name="lucene-analyzers-common-5.5.0.jar"/> + <library name="lucene-backward-codecs-5.5.0.jar"/> + <library name="lucene-core-5.5.0.jar"/> + <library name="lucene-grouping-5.5.0.jar"/> + <library name="lucene-highlighter-5.5.0.jar"/> + <library name="lucene-join-5.5.0.jar"/> + <library name="lucene-memory-5.5.0.jar"/> + <library name="lucene-misc-5.5.0.jar"/> + <library name="lucene-queries-5.5.0.jar"/> + <library name="lucene-queryparser-5.5.0.jar"/> + <library name="lucene-sandbox-5.5.0.jar"/> + <library name="lucene-spatial-5.5.0.jar"/> + <library name="lucene-spatial3d-5.5.0.jar"/> + <library name="lucene-suggest-5.5.0.jar"/> + <library name="netty-3.10.5.Final.jar"/> + <library name="securesm-1.0.jar"/> + <library name="snakeyaml-1.15.jar"/> + <library name="spatial4j-0.5.jar"/> + <library name="t-digest-3.0.jar"/> + </runtime> + + <requires> + <import plugin="nutch-extensionpoints" /> + </requires> + + <extension id="org.apache.nutch.indexer.elastic" + name="Elasticsearch Index Writer" + point="org.apache.nutch.indexer.IndexWriter"> + <implementation id="ElasticIndexWriter" + class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" /> + </extension> + +</plugin> http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java ---------------------------------------------------------------------- diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java index b0e70c8,0000000..29f36c7 mode 100644,000000..100644 --- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java +++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java @@@ -1,28 -1,0 +1,31 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexwriter.elastic; + +public interface ElasticConstants { + public static final String ELASTIC_PREFIX = "elastic."; + - public static final String HOST = ELASTIC_PREFIX + "host"; ++ public static final String HOSTS = ELASTIC_PREFIX + "host"; + public static final String PORT = ELASTIC_PREFIX + "port"; + public static final String CLUSTER = ELASTIC_PREFIX + "cluster"; + public static final String INDEX = ELASTIC_PREFIX + "index"; + public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs"; + public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size"; ++ public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis"; ++ public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries"; ++ public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout"; +} http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java ---------------------------------------------------------------------- diff --cc nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java index 9367e41,0000000..00b96f1 mode 100644,000000..100644 --- a/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java +++ b/nutch-plugins/indexer-elastic/src/main/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java @@@ -1,279 -1,0 +1,261 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.indexwriter.elastic; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +import java.io.BufferedReader; +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; ++import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.indexer.IndexWriter; +import org.apache.nutch.indexer.NutchDocument; - import org.elasticsearch.ElasticsearchException; - import org.elasticsearch.action.ListenableActionFuture; - import org.elasticsearch.action.bulk.BulkItemResponse; - import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; - import org.elasticsearch.action.delete.DeleteRequestBuilder; - import org.elasticsearch.action.index.IndexRequestBuilder; ++import org.elasticsearch.action.bulk.BulkRequest; ++import org.elasticsearch.action.bulk.BackoffPolicy; ++import org.elasticsearch.action.bulk.BulkProcessor; ++import org.elasticsearch.action.delete.DeleteRequest; ++import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; ++import org.elasticsearch.common.unit.ByteSizeUnit; ++import org.elasticsearch.common.unit.ByteSizeValue; ++import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.settings.Settings; - import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.node.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** ++ * Sends NutchDocuments to a configured Elasticsearch index. + */ +public class ElasticIndexWriter implements IndexWriter { + public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class); + ++ private static final int DEFAULT_PORT = 9300; + private static final int DEFAULT_MAX_BULK_DOCS = 250; + private static final int DEFAULT_MAX_BULK_LENGTH = 2500500; ++ private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100; ++ private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10; ++ private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600; ++ private static final String DEFAULT_INDEX = "nutch"; + ++ private String defaultIndex; + private Client client; + private Node node; - private String defaultIndex; ++ private BulkProcessor bulkProcessor; + - private Configuration config; ++ private long bulkCloseTimeout; + - private BulkRequestBuilder bulk; - private ListenableActionFuture<BulkResponse> execute; - private int port = -1; - private String host = null; - private String clusterName = null; - private int maxBulkDocs; - private int maxBulkLength; - private long indexedDocs = 0; - private int bulkDocs = 0; - private int bulkLength = 0; - private boolean createNewBulk = false; ++ private Configuration config; + + @Override + public void open(JobConf job, String name) throws IOException { - clusterName = job.get(ElasticConstants.CLUSTER); ++ bulkCloseTimeout = job.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT, ++ DEFAULT_BULK_CLOSE_TIMEOUT); ++ defaultIndex = job.get(ElasticConstants.INDEX, DEFAULT_INDEX); ++ ++ int maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS, ++ DEFAULT_MAX_BULK_DOCS); ++ int maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH, ++ DEFAULT_MAX_BULK_LENGTH); ++ int expBackoffMillis = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS, ++ DEFAULT_EXP_BACKOFF_MILLIS); ++ int expBackoffRetries = job.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, ++ DEFAULT_EXP_BACKOFF_RETRIES); ++ ++ client = makeClient(job); ++ ++ LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength); ++ bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener()) ++ .setBulkActions(maxBulkDocs) ++ .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES)) ++ .setConcurrentRequests(1) ++ .setBackoffPolicy(BackoffPolicy.exponentialBackoff( ++ TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries)) ++ .build(); ++ } + - host = job.get(ElasticConstants.HOST); - port = job.getInt(ElasticConstants.PORT, 9300); ++ /** Generates a TransportClient or NodeClient */ ++ protected Client makeClient(Configuration conf) throws IOException { ++ String clusterName = conf.get(ElasticConstants.CLUSTER); ++ String[] hosts = conf.getStrings(ElasticConstants.HOSTS); ++ int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT); + - Builder settingsBuilder = Settings.builder(); ++ Settings.Builder settingsBuilder = Settings.settingsBuilder(); + + BufferedReader reader = new BufferedReader( - job.getConfResourceAsReader("elasticsearch.conf")); ++ conf.getConfResourceAsReader("elasticsearch.conf")); + String line; + String parts[]; - + while ((line = reader.readLine()) != null) { + if (StringUtils.isNotBlank(line) && !line.startsWith("#")) { + line.trim(); + parts = line.split("="); + + if (parts.length == 2) { + settingsBuilder.put(parts[0].trim(), parts[1].trim()); + } + } + } + ++ // Set the cluster name and build the settings + if (StringUtils.isNotBlank(clusterName)) + settingsBuilder.put("cluster.name", clusterName); + - // Set the cluster name and build the settings + Settings settings = settingsBuilder.build(); + ++ Client client = null; ++ + // Prefer TransportClient - if (host != null && port > 1) { - TransportClient transportClient = TransportClient.builder() - .settings(settings).build() - .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); ++ if (hosts != null && port > 1) { ++ TransportClient transportClient = TransportClient.builder().settings(settings).build(); ++ for (String host: hosts) ++ transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + client = transportClient; + } else if (clusterName != null) { + node = nodeBuilder().settings(settings).client(true).node(); + client = node.client(); + } + - bulk = client.prepareBulk(); - defaultIndex = job.get(ElasticConstants.INDEX, "nutch"); - maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS, - DEFAULT_MAX_BULK_DOCS); - maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH, - DEFAULT_MAX_BULK_LENGTH); ++ return client; ++ } ++ ++ /** Generates a default BulkProcessor.Listener */ ++ protected BulkProcessor.Listener bulkProcessorListener() { ++ return new BulkProcessor.Listener() { ++ @Override ++ public void beforeBulk(long executionId, BulkRequest request) { } ++ ++ @Override ++ public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ++ throw new RuntimeException(failure); ++ } ++ ++ @Override ++ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ++ if (response.hasFailures()) { ++ LOG.warn("Failures occurred during bulk request"); ++ } ++ } ++ }; + } + + @Override + public void write(NutchDocument doc) throws IOException { + String id = (String) doc.getFieldValue("id"); + String type = doc.getDocumentMeta().get("type"); + if (type == null) + type = "doc"; - IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id); + ++ // Add each field of this doc to the index source + Map<String, Object> source = new HashMap<String, Object>(); - - // Loop through all fields of this doc + for (String fieldName : doc.getFieldNames()) { - if (doc.getField(fieldName).getValues().size() > 1) { ++ if (doc.getFieldValue(fieldName) != null) { + source.put(fieldName, doc.getFieldValue(fieldName)); - // Loop through the values to keep track of the size of this - // document - for (Object value : doc.getField(fieldName).getValues()) { - bulkLength += value.toString().length(); - } - } else { - if (doc.getFieldValue(fieldName) != null) { - source.put(fieldName, doc.getFieldValue(fieldName)); - bulkLength += doc.getFieldValue(fieldName).toString().length(); - } + } + } - request.setSource(source); - - // Add this indexing request to a bulk request - bulk.add(request); - indexedDocs++; - bulkDocs++; - - if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) { - LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = " - + bulkLength + ", total docs = " + indexedDocs - + ", last doc in bulk = '" + id + "']"); - // Flush the bulk of indexing requests - createNewBulk = true; - commit(); - } ++ ++ IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source); ++ bulkProcessor.add(request); + } + + @Override + public void delete(String key) throws IOException { - try { - DeleteRequestBuilder builder = client.prepareDelete(); - builder.setIndex(defaultIndex); - builder.setType("doc"); - builder.setId(key); - builder.execute().actionGet(); - } catch (ElasticsearchException e) { - throw makeIOException(e); - } - } - - public static IOException makeIOException(ElasticsearchException e) { - final IOException ioe = new IOException(); - ioe.initCause(e); - return ioe; ++ DeleteRequest request = new DeleteRequest(defaultIndex, "doc", key); ++ bulkProcessor.add(request); + } + + @Override + public void update(NutchDocument doc) throws IOException { + write(doc); + } + + @Override + public void commit() throws IOException { - if (execute != null) { - // wait for previous to finish - long beforeWait = System.currentTimeMillis(); - BulkResponse actionGet = execute.actionGet(); - if (actionGet.hasFailures()) { - for (BulkItemResponse item : actionGet) { - if (item.isFailed()) { - throw new RuntimeException("First failure in bulk: " - + item.getFailureMessage()); - } - } - } - long msWaited = System.currentTimeMillis() - beforeWait; - LOG.info("Previous took in ms " + actionGet.getTookInMillis() - + ", including wait " + msWaited); - execute = null; - } - if (bulk != null) { - if (bulkDocs > 0) { - // start a flush, note that this is an asynchronous call - execute = bulk.execute(); - } - bulk = null; - } - if (createNewBulk) { - // Prepare a new bulk request - bulk = client.prepareBulk(); - bulkDocs = 0; - bulkLength = 0; - } ++ bulkProcessor.flush(); + } + + @Override + public void close() throws IOException { - // Flush pending requests - LOG.info("Processing remaining requests [docs = " + bulkDocs - + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]"); - createNewBulk = false; - commit(); - // flush one more time to finalize the last bulk - LOG.info("Processing to finalize last execute"); - createNewBulk = false; - commit(); - - // Close ++ // Close BulkProcessor (automatically flushes) ++ try { ++ bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS); ++ } catch (InterruptedException e) { ++ LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage()); ++ } ++ + client.close(); + if (node != null) { + node.close(); + } + } + + @Override + public String describe() { + StringBuffer sb = new StringBuffer("ElasticIndexWriter\n"); + sb.append("\t").append(ElasticConstants.CLUSTER) + .append(" : elastic prefix cluster\n"); - sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n"); ++ sb.append("\t").append(ElasticConstants.HOSTS).append(" : hostname\n"); + sb.append("\t").append(ElasticConstants.PORT).append(" : port\n"); + sb.append("\t").append(ElasticConstants.INDEX) + .append(" : elastic index command \n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS) - .append(" : elastic bulk index doc counts. (default 250) \n"); ++ .append(" : elastic bulk index doc counts. (default ") ++ .append(DEFAULT_MAX_BULK_DOCS).append(")\n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH) - .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n"); ++ .append(" : elastic bulk index length in bytes. (default ") ++ .append(DEFAULT_MAX_BULK_LENGTH).append(")\n"); ++ sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS) ++ .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ") ++ .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n"); ++ sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES) ++ .append(" : elastic bulk exponential backoff max retries. (default ") ++ .append(DEFAULT_EXP_BACKOFF_RETRIES).append(")\n"); ++ sb.append("\t").append(ElasticConstants.BULK_CLOSE_TIMEOUT) ++ .append(" : elastic timeout for the last bulk in seconds. (default ") ++ .append(DEFAULT_BULK_CLOSE_TIMEOUT).append(")\n"); + return sb.toString(); + } + + @Override + public void setConf(Configuration conf) { + config = conf; + String cluster = conf.get(ElasticConstants.CLUSTER); - String host = conf.get(ElasticConstants.HOST); ++ String hosts = conf.get(ElasticConstants.HOSTS); + - if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) { ++ if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) { + String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml "; + message += "\n" + describe(); + LOG.error(message); + throw new RuntimeException(message); + } + } + + @Override + public Configuration getConf() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/2175c767/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java ---------------------------------------------------------------------- diff --cc nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java index eec0080,0000000..d70bc62 mode 100644,000000..100644 --- a/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java +++ b/nutch-plugins/indexer-solr/src/main/java/org/apache/nutch/indexwriter/solr/SolrUtils.java @@@ -1,97 -1,0 +1,99 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexwriter.solr; + + +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.mapred.JobConf; ++import org.apache.http.impl.client.SystemDefaultHttpClient; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; + +import java.net.MalformedURLException; + +public class SolrUtils { + + public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class); ++ private static HttpClient HTTP_CLIENT = new SystemDefaultHttpClient(); + + /** + * + * + * @param JobConf + * @return SolrClient + */ + public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException { + String[] urls = job.getStrings(SolrConstants.SERVER_URL); + String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS); + ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>(); + + if (zkHostString != null && zkHostString.length > 0) { + for (int i = 0; i < zkHostString.length; i++) { + CloudSolrClient sc = getCloudSolrClient(zkHostString[i]); + sc.setDefaultCollection(job.get(SolrConstants.COLLECTION)); + solrClients.add(sc); + } + } else { + for (int i = 0; i < urls.length; i++) { - SolrClient sc = new HttpSolrClient(urls[i]); ++ SolrClient sc = new HttpSolrClient(urls[i], HTTP_CLIENT); + solrClients.add(sc); + } + } + + return solrClients; + } + + public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException { - CloudSolrClient sc = new CloudSolrClient(url.replace('|', ',')); ++ CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','), HTTP_CLIENT); + sc.setParallelUpdates(true); + sc.connect(); + return sc; + } + + public static SolrClient getHttpSolrClient(String url) throws MalformedURLException { - SolrClient sc =new HttpSolrClient(url); ++ SolrClient sc =new HttpSolrClient(url, HTTP_CLIENT); + return sc; + } + + public static String stripNonCharCodepoints(String input) { + StringBuilder retval = new StringBuilder(); + char ch; + + for (int i = 0; i < input.length(); i++) { + ch = input.charAt(i); + + // Strip all non-characters + // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:] + // and non-printable control characters except tabulator, new line and + // carriage return + if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step 0x10000 + ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range + (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef + (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) { + + retval.append(ch); + } + } + + return retval.toString(); + } + +}
