Author: markus Date: Wed Jun 19 08:31:28 2013 New Revision: 1494496 URL: http://svn.apache.org/r1494496 Log: NUTCH-1527 Elasticsearch indexer
Added: nutch/trunk/src/plugin/indexer-elastic/ nutch/trunk/src/plugin/indexer-elastic/build.xml nutch/trunk/src/plugin/indexer-elastic/ivy.xml nutch/trunk/src/plugin/indexer-elastic/plugin.xml nutch/trunk/src/plugin/indexer-elastic/src/ nutch/trunk/src/plugin/indexer-elastic/src/java/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Modified: nutch/trunk/CHANGES.txt nutch/trunk/conf/log4j.properties nutch/trunk/conf/nutch-default.xml nutch/trunk/src/plugin/build.xml Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1494496&r1=1494495&r2=1494496&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Wed Jun 19 08:31:28 2013 @@ -2,6 +2,8 @@ Nutch Change Log (trunk): Current Development +* NUTCH-1527 Elasticsearch indexer (lufeng + markus) + * NUTCH-1475 Index-More Plugin -- A better fall back value for date field (James Sullivan, snagel via lewismc) * NUTCH-1560 index-metadata to add all values of multivalued metadata (snagel) Modified: nutch/trunk/conf/log4j.properties URL: http://svn.apache.org/viewvc/nutch/trunk/conf/log4j.properties?rev=1494496&r1=1494495&r2=1494496&view=diff ============================================================================== --- nutch/trunk/conf/log4j.properties (original) +++ nutch/trunk/conf/log4j.properties Wed Jun 19 08:31:28 2013 @@ -22,6 +22,7 @@ log4j.logger.org.apache.nutch.segment.Se log4j.logger.org.apache.nutch.crawl.CrawlDb=INFO,cmdstdout log4j.logger.org.apache.nutch.crawl.LinkDb=INFO,cmdstdout log4j.logger.org.apache.nutch.crawl.LinkDbMerger=INFO,cmdstdout +log4j.logger.org.apache.nutch.indexer.IndexingJob=INFO,cmdstdout log4j.logger.org.apache.nutch.indexer.solr.SolrIndexer=INFO,cmdstdout log4j.logger.org.apache.nutch.indexer.solr.SolrWriter=INFO,cmdstdout log4j.logger.org.apache.nutch.indexer.solr.SolrDeleteDuplicates=INFO,cmdstdout Modified: nutch/trunk/conf/nutch-default.xml URL: http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1494496&r1=1494495&r2=1494496&view=diff ============================================================================== --- nutch/trunk/conf/nutch-default.xml (original) +++ nutch/trunk/conf/nutch-default.xml Wed Jun 19 08:31:28 2013 @@ -1413,6 +1413,46 @@ </description> </property> +<!-- Elasticsearch properties --> + +<property> + <name>elastic.host</name> + <value></value> + <description>The hostname to send documents to using TransportClient. Either host + and port must be defined or cluster.</description> +</property> + +<property> + <name>elastic.port</name> + <value>9300</value>The port to connect to using TransportClient.<description> + </description> +</property> + +<property> + <name>elastic.cluster</name> + <value></value> + <description>The cluster name to discover. Either host and potr must be defined + or cluster.</description> +</property> + +<property> + <name>elastic.index</name> + <value>nutch</value> + <description>Default index to send documents to.</description> +</property> + +<property> + <name>elastic.max.bulk.docs</name> + <value>250</value> + <description>Maximum size of the bulk in number of documents.</description> +</property> + +<property> + <name>elastic.max.bulk.size</name> + <value>2500500</value> + <description>Maximum size of the bulk in bytes.</description> +</property> + <!-- subcollection properties --> <property> Modified: nutch/trunk/src/plugin/build.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/build.xml?rev=1494496&r1=1494495&r2=1494496&view=diff ============================================================================== --- nutch/trunk/src/plugin/build.xml (original) +++ nutch/trunk/src/plugin/build.xml Wed Jun 19 08:31:28 2013 @@ -34,6 +34,7 @@ <ant dir="index-more" target="deploy"/> <ant dir="index-static" target="deploy"/> <ant dir="index-metadata" target="deploy"/> + <ant dir="indexer-elastic" target="deploy"/> <ant dir="indexer-solr" target="deploy"/> <ant dir="language-identifier" target="deploy"/> <ant dir="lib-http" target="deploy"/> @@ -119,6 +120,7 @@ <ant dir="index-more" target="clean"/> <ant dir="index-static" target="clean"/> <ant dir="index-metadata" target="clean"/> + <ant dir="indexer-elastic" target="clean"/> <ant dir="indexer-solr" target="clean"/> <ant dir="language-identifier" target="clean"/> <ant dir="lib-commons-httpclient" target="clean"/> Added: nutch/trunk/src/plugin/indexer-elastic/build.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/build.xml?rev=1494496&view=auto ============================================================================== --- nutch/trunk/src/plugin/indexer-elastic/build.xml (added) +++ nutch/trunk/src/plugin/indexer-elastic/build.xml Wed Jun 19 08:31:28 2013 @@ -0,0 +1,22 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="indexer-elastic" default="jar-core"> + + <import file="../build-plugin.xml" /> + +</project> Added: nutch/trunk/src/plugin/indexer-elastic/ivy.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/ivy.xml?rev=1494496&view=auto ============================================================================== --- nutch/trunk/src/plugin/indexer-elastic/ivy.xml (added) +++ nutch/trunk/src/plugin/indexer-elastic/ivy.xml Wed Jun 19 08:31:28 2013 @@ -0,0 +1,43 @@ +<?xml version="1.0" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<ivy-module version="1.0"> + <info organisation="org.apache.nutch" module="${ant.project.name}"> + <license name="Apache 2.0"/> + <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/> + <description> + Apache Nutch + </description> + </info> + + <configurations> + <include file="../../..//ivy/ivy-configurations.xml"/> + </configurations> + + <publications> + <!--get the artifact from our module name--> + <artifact conf="master"/> + </publications> + + <dependencies> + <dependency org="org.elasticsearch" name="elasticsearch" rev="0.90.1" + conf="*->default"/> + </dependencies> + +</ivy-module> Added: nutch/trunk/src/plugin/indexer-elastic/plugin.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/plugin.xml?rev=1494496&view=auto ============================================================================== --- nutch/trunk/src/plugin/indexer-elastic/plugin.xml (added) +++ nutch/trunk/src/plugin/indexer-elastic/plugin.xml Wed Jun 19 08:31:28 2013 @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0" + provider-name="nutch.apache.org"> + + <runtime> + <library name="indexer-elastic.jar"> + <export name="*" /> + </library> + + <library name="elasticsearch-0.90.1.jar"/> + <library name="jna-3.3.0.jar"/> + <library name="jts-1.12.jar"/> + <library name="log4j-1.2.17.jar"/> + <library name="lucene-codecs-4.3.0.jar"/> + <library name="lucene-core-4.3.0.jar"/> + <library name="lucene-grouping-4.3.0.jar"/> + <library name="lucene-highlighter-4.3.0.jar"/> + <library name="lucene-join-4.3.0.jar"/> + <library name="lucene-memory-4.3.0.jar"/> + <library name="lucene-queries-4.3.0.jar"/> + <library name="lucene-queryparser-4.3.0.jar"/> + <library name="lucene-sandbox-4.3.0.jar"/> + <library name="lucene-spatial-4.3.0.jar"/> + <library name="lucene-suggest-4.3.0.jar"/> + <library name="spatial4j-0.3.jar"/> + </runtime> + + <requires> + <import plugin="nutch-extensionpoints" /> + </requires> + + <extension id="org.apache.nutch.indexer.elastic" + name="Elasticsearch Index Writer" + point="org.apache.nutch.indexer.IndexWriter"> + <implementation id="ElasticIndexWriter" + class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" /> + </extension> + +</plugin> Added: nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java?rev=1494496&view=auto ============================================================================== --- nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java (added) +++ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java Wed Jun 19 08:31:28 2013 @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexwriter.elastic; + +public interface ElasticConstants { + public static final String ELASTIC_PREFIX = "elastic."; + + public static final String HOST = ELASTIC_PREFIX + "host"; + public static final String PORT = ELASTIC_PREFIX + "port"; + public static final String CLUSTER = ELASTIC_PREFIX + "cluster"; + public static final String INDEX = ELASTIC_PREFIX + "index"; + public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs"; + public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size"; +} \ No newline at end of file Added: nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1494496&view=auto ============================================================================== --- nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (added) +++ nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Wed Jun 19 08:31:28 2013 @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.indexwriter.elastic; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.indexer.IndexWriter; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.node.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class ElasticIndexWriter implements IndexWriter { + public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class); + + private static final int DEFAULT_MAX_BULK_DOCS = 250; + private static final int DEFAULT_MAX_BULK_LENGTH = 2500500; + + private Client client; + private Node node; + private String defaultIndex; + + private Configuration config; + + private BulkRequestBuilder bulk; + private ListenableActionFuture<BulkResponse> execute; + private int port = -1; + private String host = null; + private String clusterName = null; + private int maxBulkDocs; + private int maxBulkLength; + private long indexedDocs = 0; + private int bulkDocs = 0; + private int bulkLength = 0; + private boolean createNewBulk = false; + + @Override + public void open(JobConf job, String name) throws IOException { + clusterName = job.get(ElasticConstants.CLUSTER); + host = job.get(ElasticConstants.HOST); + port = job.getInt(ElasticConstants.PORT, -1); + + // Prefer TransportClient + if (host != null && port > 1) { + Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).build(); + client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port)); + } else if (clusterName != null) { + node = nodeBuilder().clusterName(clusterName).client(true).node(); + client = node.client(); + } + + bulk = client.prepareBulk(); + defaultIndex = job.get(ElasticConstants.INDEX, "nutch"); + maxBulkDocs = job.getInt( + ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS); + maxBulkLength = job.getInt( + ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH); + } + + @Override + public void write(NutchDocument doc) throws IOException { + String id = (String)doc.getFieldValue("url"); + String type = doc.getDocumentMeta().get("type"); + if (type == null) type = "doc"; + IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id); + + Map<String, Object> source = new HashMap<String, Object>(); + + // Loop through all fields of this doc + for (String fieldName : doc.getFieldNames()) { + if (doc.getField(fieldName).getValues().size() > 1) { + source.put(fieldName, doc.getFieldValue(fieldName)); + // Loop through the values to keep track of the size of this document + for (Object value : doc.getField(fieldName).getValues()) { + bulkLength += value.toString().length(); + } + } else { + source.put(fieldName, doc.getFieldValue(fieldName)); + bulkLength += doc.getFieldValue(fieldName).toString().length(); + } + } + request.setSource(source); + + // Add this indexing request to a bulk request + bulk.add(request); + indexedDocs++; + bulkDocs++; + + if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) { + LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = " + + bulkLength + ", total docs = " + indexedDocs + + ", last doc in bulk = '" + id + "']"); + // Flush the bulk of indexing requests + createNewBulk = true; + commit(); + } + } + + + @Override + public void delete(String key) throws IOException { + try{ + DeleteRequestBuilder builder = client.prepareDelete(); + builder.setId(key); + builder.execute().actionGet(); + }catch(ElasticSearchException e) + { + throw makeIOException(e); + } + } + + public static IOException makeIOException(ElasticSearchException e) { + final IOException ioe = new IOException(); + ioe.initCause(e); + return ioe; + } + + @Override + public void update(NutchDocument doc) throws IOException { + write(doc); + } + + @Override + public void commit() throws IOException { + if (execute != null) { + // wait for previous to finish + long beforeWait = System.currentTimeMillis(); + BulkResponse actionGet = execute.actionGet(); + if (actionGet.hasFailures()) { + for (BulkItemResponse item : actionGet) { + if (item.isFailed()) { + throw new RuntimeException("First failure in bulk: " + + item.getFailureMessage()); + } + } + } + long msWaited = System.currentTimeMillis() - beforeWait; + LOG.info("Previous took in ms " + actionGet.getTookInMillis() + + ", including wait " + msWaited); + execute = null; + } + if (bulk != null) { + if (bulkDocs > 0) { + // start a flush, note that this is an asynchronous call + execute = bulk.execute(); + } + bulk = null; + } + if (createNewBulk) { + // Prepare a new bulk request + bulk = client.prepareBulk(); + bulkDocs = 0; + bulkLength = 0; + } + } + + @Override + public void close() throws IOException { + // Flush pending requests + LOG.info("Processing remaining requests [docs = " + bulkDocs + + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]"); + createNewBulk = false; + commit(); + // flush one more time to finalize the last bulk + LOG.info("Processing to finalize last execute"); + createNewBulk = false; + commit(); + + // Close + client.close(); + if (node != null) { + node.close(); + } + } + + @Override + public String describe() { + StringBuffer sb = new StringBuffer("ElasticIndexWriter\n"); + sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n"); + sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n"); + sb.append("\t").append(ElasticConstants.PORT).append(" : port\n"); + sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n"); + return sb.toString(); + } + + @Override + public void setConf(Configuration conf) { + config = conf; + String cluster = conf.get(ElasticConstants.CLUSTER); + if (cluster == null) { + String message = "Missing elastic.cluster. Should be set in nutch-site.xml "; + message+="\n"+describe(); + LOG.error(message); + throw new RuntimeException(message); + } + } + + @Override + public Configuration getConf() { + return config; + } +}