Author: markus
Date: Wed Jun 19 08:31:28 2013
New Revision: 1494496
URL: http://svn.apache.org/r1494496
Log:
NUTCH-1527 Elasticsearch indexer
Added:
nutch/trunk/src/plugin/indexer-elastic/
nutch/trunk/src/plugin/indexer-elastic/build.xml
nutch/trunk/src/plugin/indexer-elastic/ivy.xml
nutch/trunk/src/plugin/indexer-elastic/plugin.xml
nutch/trunk/src/plugin/indexer-elastic/src/
nutch/trunk/src/plugin/indexer-elastic/src/java/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/conf/log4j.properties
nutch/trunk/conf/nutch-default.xml
nutch/trunk/src/plugin/build.xml
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Wed Jun 19 08:31:28 2013
@@ -2,6 +2,8 @@ Nutch Change Log
(trunk): Current Development
+* NUTCH-1527 Elasticsearch indexer (lufeng + markus)
+
* NUTCH-1475 Index-More Plugin -- A better fall back value for date field
(James Sullivan, snagel via lewismc)
* NUTCH-1560 index-metadata to add all values of multivalued metadata (snagel)
Modified: nutch/trunk/conf/log4j.properties
URL:
http://svn.apache.org/viewvc/nutch/trunk/conf/log4j.properties?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/conf/log4j.properties (original)
+++ nutch/trunk/conf/log4j.properties Wed Jun 19 08:31:28 2013
@@ -22,6 +22,7 @@ log4j.logger.org.apache.nutch.segment.Se
log4j.logger.org.apache.nutch.crawl.CrawlDb=INFO,cmdstdout
log4j.logger.org.apache.nutch.crawl.LinkDb=INFO,cmdstdout
log4j.logger.org.apache.nutch.crawl.LinkDbMerger=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.IndexingJob=INFO,cmdstdout
log4j.logger.org.apache.nutch.indexer.solr.SolrIndexer=INFO,cmdstdout
log4j.logger.org.apache.nutch.indexer.solr.SolrWriter=INFO,cmdstdout
log4j.logger.org.apache.nutch.indexer.solr.SolrDeleteDuplicates=INFO,cmdstdout
Modified: nutch/trunk/conf/nutch-default.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Wed Jun 19 08:31:28 2013
@@ -1413,6 +1413,46 @@
</description>
</property>
+<!-- Elasticsearch properties -->
+
+<property>
+ <name>elastic.host</name>
+ <value></value>
+ <description>The hostname to send documents to using TransportClient. Either
host
+ and port must be defined or cluster.</description>
+</property>
+
+<property>
+ <name>elastic.port</name>
+ <value>9300</value>The port to connect to using TransportClient.<description>
+ </description>
+</property>
+
+<property>
+ <name>elastic.cluster</name>
+ <value></value>
+ <description>The cluster name to discover. Either host and potr must be
defined
+ or cluster.</description>
+</property>
+
+<property>
+ <name>elastic.index</name>
+ <value>nutch</value>
+ <description>Default index to send documents to.</description>
+</property>
+
+<property>
+ <name>elastic.max.bulk.docs</name>
+ <value>250</value>
+ <description>Maximum size of the bulk in number of documents.</description>
+</property>
+
+<property>
+ <name>elastic.max.bulk.size</name>
+ <value>2500500</value>
+ <description>Maximum size of the bulk in bytes.</description>
+</property>
+
<!-- subcollection properties -->
<property>
Modified: nutch/trunk/src/plugin/build.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/build.xml?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/src/plugin/build.xml (original)
+++ nutch/trunk/src/plugin/build.xml Wed Jun 19 08:31:28 2013
@@ -34,6 +34,7 @@
<ant dir="index-more" target="deploy"/>
<ant dir="index-static" target="deploy"/>
<ant dir="index-metadata" target="deploy"/>
+ <ant dir="indexer-elastic" target="deploy"/>
<ant dir="indexer-solr" target="deploy"/>
<ant dir="language-identifier" target="deploy"/>
<ant dir="lib-http" target="deploy"/>
@@ -119,6 +120,7 @@
<ant dir="index-more" target="clean"/>
<ant dir="index-static" target="clean"/>
<ant dir="index-metadata" target="clean"/>
+ <ant dir="indexer-elastic" target="clean"/>
<ant dir="indexer-solr" target="clean"/>
<ant dir="language-identifier" target="clean"/>
<ant dir="lib-commons-httpclient" target="clean"/>
Added: nutch/trunk/src/plugin/indexer-elastic/build.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/build.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/build.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/build.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic" default="jar-core">
+
+ <import file="../build-plugin.xml" />
+
+</project>
Added: nutch/trunk/src/plugin/indexer-elastic/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/ivy.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/ivy.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/ivy.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,43 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<ivy-module version="1.0">
+ <info organisation="org.apache.nutch" module="${ant.project.name}">
+ <license name="Apache 2.0"/>
+ <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+ <description>
+ Apache Nutch
+ </description>
+ </info>
+
+ <configurations>
+ <include file="../../..//ivy/ivy-configurations.xml"/>
+ </configurations>
+
+ <publications>
+ <!--get the artifact from our module name-->
+ <artifact conf="master"/>
+ </publications>
+
+ <dependencies>
+ <dependency org="org.elasticsearch" name="elasticsearch" rev="0.90.1"
+ conf="*->default"/>
+ </dependencies>
+
+</ivy-module>
Added: nutch/trunk/src/plugin/indexer-elastic/plugin.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/plugin.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/plugin.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/plugin.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
+ provider-name="nutch.apache.org">
+
+ <runtime>
+ <library name="indexer-elastic.jar">
+ <export name="*" />
+ </library>
+
+ <library name="elasticsearch-0.90.1.jar"/>
+ <library name="jna-3.3.0.jar"/>
+ <library name="jts-1.12.jar"/>
+ <library name="log4j-1.2.17.jar"/>
+ <library name="lucene-codecs-4.3.0.jar"/>
+ <library name="lucene-core-4.3.0.jar"/>
+ <library name="lucene-grouping-4.3.0.jar"/>
+ <library name="lucene-highlighter-4.3.0.jar"/>
+ <library name="lucene-join-4.3.0.jar"/>
+ <library name="lucene-memory-4.3.0.jar"/>
+ <library name="lucene-queries-4.3.0.jar"/>
+ <library name="lucene-queryparser-4.3.0.jar"/>
+ <library name="lucene-sandbox-4.3.0.jar"/>
+ <library name="lucene-spatial-4.3.0.jar"/>
+ <library name="lucene-suggest-4.3.0.jar"/>
+ <library name="spatial4j-0.3.jar"/>
+ </runtime>
+
+ <requires>
+ <import plugin="nutch-extensionpoints" />
+ </requires>
+
+ <extension id="org.apache.nutch.indexer.elastic"
+ name="Elasticsearch Index Writer"
+ point="org.apache.nutch.indexer.IndexWriter">
+ <implementation id="ElasticIndexWriter"
+ class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
+ </extension>
+
+</plugin>
Added:
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java?rev=1494496&view=auto
==============================================================================
---
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
(added)
+++
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
Wed Jun 19 08:31:28 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic;
+
+public interface ElasticConstants {
+ public static final String ELASTIC_PREFIX = "elastic.";
+
+ public static final String HOST = ELASTIC_PREFIX + "host";
+ public static final String PORT = ELASTIC_PREFIX + "port";
+ public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+ public static final String INDEX = ELASTIC_PREFIX + "index";
+ public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+ public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX +
"max.bulk.size";
+}
\ No newline at end of file
Added:
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1494496&view=auto
==============================================================================
---
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
(added)
+++
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Wed Jun 19 08:31:28 2013
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.IndexWriter;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ElasticIndexWriter implements IndexWriter {
+ public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
+ private static final int DEFAULT_MAX_BULK_DOCS = 250;
+ private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+ private Client client;
+ private Node node;
+ private String defaultIndex;
+
+ private Configuration config;
+
+ private BulkRequestBuilder bulk;
+ private ListenableActionFuture<BulkResponse> execute;
+ private int port = -1;
+ private String host = null;
+ private String clusterName = null;
+ private int maxBulkDocs;
+ private int maxBulkLength;
+ private long indexedDocs = 0;
+ private int bulkDocs = 0;
+ private int bulkLength = 0;
+ private boolean createNewBulk = false;
+
+ @Override
+ public void open(JobConf job, String name) throws IOException {
+ clusterName = job.get(ElasticConstants.CLUSTER);
+ host = job.get(ElasticConstants.HOST);
+ port = job.getInt(ElasticConstants.PORT, -1);
+
+ // Prefer TransportClient
+ if (host != null && port > 1) {
+ Settings settings =
ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).build();
+ client = new TransportClient(settings).addTransportAddress(new
InetSocketTransportAddress(host, port));
+ } else if (clusterName != null) {
+ node = nodeBuilder().clusterName(clusterName).client(true).node();
+ client = node.client();
+ }
+
+ bulk = client.prepareBulk();
+ defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
+ maxBulkDocs = job.getInt(
+ ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getInt(
+ ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+ }
+
+ @Override
+ public void write(NutchDocument doc) throws IOException {
+ String id = (String)doc.getFieldValue("url");
+ String type = doc.getDocumentMeta().get("type");
+ if (type == null) type = "doc";
+ IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
+ Map<String, Object> source = new HashMap<String, Object>();
+
+ // Loop through all fields of this doc
+ for (String fieldName : doc.getFieldNames()) {
+ if (doc.getField(fieldName).getValues().size() > 1) {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ // Loop through the values to keep track of the size of this document
+ for (Object value : doc.getField(fieldName).getValues()) {
+ bulkLength += value.toString().length();
+ }
+ } else {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ bulkLength += doc.getFieldValue(fieldName).toString().length();
+ }
+ }
+ request.setSource(source);
+
+ // Add this indexing request to a bulk request
+ bulk.add(request);
+ indexedDocs++;
+ bulkDocs++;
+
+ if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+ LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
+ // Flush the bulk of indexing requests
+ createNewBulk = true;
+ commit();
+ }
+ }
+
+
+ @Override
+ public void delete(String key) throws IOException {
+ try{
+ DeleteRequestBuilder builder = client.prepareDelete();
+ builder.setId(key);
+ builder.execute().actionGet();
+ }catch(ElasticSearchException e)
+ {
+ throw makeIOException(e);
+ }
+ }
+
+ public static IOException makeIOException(ElasticSearchException e) {
+ final IOException ioe = new IOException();
+ ioe.initCause(e);
+ return ioe;
+ }
+
+ @Override
+ public void update(NutchDocument doc) throws IOException {
+ write(doc);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (execute != null) {
+ // wait for previous to finish
+ long beforeWait = System.currentTimeMillis();
+ BulkResponse actionGet = execute.actionGet();
+ if (actionGet.hasFailures()) {
+ for (BulkItemResponse item : actionGet) {
+ if (item.isFailed()) {
+ throw new RuntimeException("First failure in bulk: "
+ + item.getFailureMessage());
+ }
+ }
+ }
+ long msWaited = System.currentTimeMillis() - beforeWait;
+ LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+ + ", including wait " + msWaited);
+ execute = null;
+ }
+ if (bulk != null) {
+ if (bulkDocs > 0) {
+ // start a flush, note that this is an asynchronous call
+ execute = bulk.execute();
+ }
+ bulk = null;
+ }
+ if (createNewBulk) {
+ // Prepare a new bulk request
+ bulk = client.prepareBulk();
+ bulkDocs = 0;
+ bulkLength = 0;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Flush pending requests
+ LOG.info("Processing remaining requests [docs = " + bulkDocs
+ + ", length = " + bulkLength + ", total docs = " + indexedDocs +
"]");
+ createNewBulk = false;
+ commit();
+ // flush one more time to finalize the last bulk
+ LOG.info("Processing to finalize last execute");
+ createNewBulk = false;
+ commit();
+
+ // Close
+ client.close();
+ if (node != null) {
+ node.close();
+ }
+ }
+
+ @Override
+ public String describe() {
+ StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+ sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix
cluster\n");
+ sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+ sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
+ sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index
command \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic
bulk index doc counts. (default 250) \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" :
elastic bulk index length. (default 2500500 ~2.5MB)\n");
+ return sb.toString();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ config = conf;
+ String cluster = conf.get(ElasticConstants.CLUSTER);
+ if (cluster == null) {
+ String message = "Missing elastic.cluster. Should be set in
nutch-site.xml ";
+ message+="\n"+describe();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return config;
+ }
+}