Author: markus
Date: Wed Jun 19 08:31:28 2013
New Revision: 1494496

URL: http://svn.apache.org/r1494496
Log:
NUTCH-1527 Elasticsearch indexer

Added:
    nutch/trunk/src/plugin/indexer-elastic/
    nutch/trunk/src/plugin/indexer-elastic/build.xml
    nutch/trunk/src/plugin/indexer-elastic/ivy.xml
    nutch/trunk/src/plugin/indexer-elastic/plugin.xml
    nutch/trunk/src/plugin/indexer-elastic/src/
    nutch/trunk/src/plugin/indexer-elastic/src/java/
    nutch/trunk/src/plugin/indexer-elastic/src/java/org/
    nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/
    nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/
    
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/
    
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/
    
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
    
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/conf/log4j.properties
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/src/plugin/build.xml

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Wed Jun 19 08:31:28 2013
@@ -2,6 +2,8 @@ Nutch Change Log
 
 (trunk): Current Development
 
+* NUTCH-1527 Elasticsearch indexer (lufeng + markus)
+
 * NUTCH-1475 Index-More Plugin -- A better fall back value for date field 
(James Sullivan, snagel via lewismc)
 
 * NUTCH-1560 index-metadata to add all values of multivalued metadata (snagel)

Modified: nutch/trunk/conf/log4j.properties
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/log4j.properties?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/conf/log4j.properties (original)
+++ nutch/trunk/conf/log4j.properties Wed Jun 19 08:31:28 2013
@@ -22,6 +22,7 @@ log4j.logger.org.apache.nutch.segment.Se
 log4j.logger.org.apache.nutch.crawl.CrawlDb=INFO,cmdstdout
 log4j.logger.org.apache.nutch.crawl.LinkDb=INFO,cmdstdout
 log4j.logger.org.apache.nutch.crawl.LinkDbMerger=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.IndexingJob=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.solr.SolrIndexer=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.solr.SolrWriter=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.solr.SolrDeleteDuplicates=INFO,cmdstdout

Modified: nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Wed Jun 19 08:31:28 2013
@@ -1413,6 +1413,46 @@
   </description>
 </property>
 
+<!-- Elasticsearch properties -->
+
+<property>
+  <name>elastic.host</name>
+  <value></value>
+  <description>The hostname to send documents to using TransportClient. Either 
host
+  and port must be defined or cluster.</description>
+</property>
+
+<property> 
+  <name>elastic.port</name>
+  <value>9300</value>The port to connect to using TransportClient.<description>
+  </description>
+</property>
+
+<property> 
+  <name>elastic.cluster</name>
+  <value></value>
+  <description>The cluster name to discover. Either host and potr must be 
defined
+  or cluster.</description>
+</property>
+
+<property> 
+  <name>elastic.index</name>
+  <value>nutch</value> 
+  <description>Default index to send documents to.</description>
+</property>
+
+<property> 
+  <name>elastic.max.bulk.docs</name>
+  <value>250</value> 
+  <description>Maximum size of the bulk in number of documents.</description>
+</property>
+
+<property> 
+  <name>elastic.max.bulk.size</name>
+  <value>2500500</value> 
+  <description>Maximum size of the bulk in bytes.</description>
+</property>
+
 <!-- subcollection properties -->
 
 <property>

Modified: nutch/trunk/src/plugin/build.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/build.xml?rev=1494496&r1=1494495&r2=1494496&view=diff
==============================================================================
--- nutch/trunk/src/plugin/build.xml (original)
+++ nutch/trunk/src/plugin/build.xml Wed Jun 19 08:31:28 2013
@@ -34,6 +34,7 @@
      <ant dir="index-more" target="deploy"/>
      <ant dir="index-static" target="deploy"/>
      <ant dir="index-metadata" target="deploy"/>
+     <ant dir="indexer-elastic" target="deploy"/>
      <ant dir="indexer-solr" target="deploy"/>
      <ant dir="language-identifier" target="deploy"/>
      <ant dir="lib-http" target="deploy"/>
@@ -119,6 +120,7 @@
     <ant dir="index-more" target="clean"/>
     <ant dir="index-static" target="clean"/>
     <ant dir="index-metadata" target="clean"/>
+    <ant dir="indexer-elastic" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
     <ant dir="language-identifier" target="clean"/>
     <ant dir="lib-commons-httpclient" target="clean"/>

Added: nutch/trunk/src/plugin/indexer-elastic/build.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/build.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/build.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/build.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+</project>

Added: nutch/trunk/src/plugin/indexer-elastic/ivy.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/ivy.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/ivy.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/ivy.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,43 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../..//ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+        <dependency org="org.elasticsearch" name="elasticsearch" rev="0.90.1"
+                    conf="*->default"/>
+  </dependencies>
+  
+</ivy-module>

Added: nutch/trunk/src/plugin/indexer-elastic/plugin.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/plugin.xml?rev=1494496&view=auto
==============================================================================
--- nutch/trunk/src/plugin/indexer-elastic/plugin.xml (added)
+++ nutch/trunk/src/plugin/indexer-elastic/plugin.xml Wed Jun 19 08:31:28 2013
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-elastic" name="ElasticIndexWriter" version="1.0.0"
+  provider-name="nutch.apache.org">
+
+  <runtime>
+    <library name="indexer-elastic.jar">
+      <export name="*" />
+    </library>
+    
+    <library name="elasticsearch-0.90.1.jar"/>
+    <library name="jna-3.3.0.jar"/>
+    <library name="jts-1.12.jar"/>
+    <library name="log4j-1.2.17.jar"/>
+    <library name="lucene-codecs-4.3.0.jar"/>
+    <library name="lucene-core-4.3.0.jar"/>
+    <library name="lucene-grouping-4.3.0.jar"/>
+    <library name="lucene-highlighter-4.3.0.jar"/>
+    <library name="lucene-join-4.3.0.jar"/>
+    <library name="lucene-memory-4.3.0.jar"/>
+    <library name="lucene-queries-4.3.0.jar"/>
+    <library name="lucene-queryparser-4.3.0.jar"/>
+    <library name="lucene-sandbox-4.3.0.jar"/>
+    <library name="lucene-spatial-4.3.0.jar"/>
+    <library name="lucene-suggest-4.3.0.jar"/>
+    <library name="spatial4j-0.3.jar"/>
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+
+  <extension id="org.apache.nutch.indexer.elastic"
+    name="Elasticsearch Index Writer"
+    point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="ElasticIndexWriter"
+      class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter" />
+  </extension>
+
+</plugin>

Added: 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java?rev=1494496&view=auto
==============================================================================
--- 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
 (added)
+++ 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
 Wed Jun 19 08:31:28 2013
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic;
+
+public interface ElasticConstants {
+  public static final String ELASTIC_PREFIX = "elastic.";
+
+  public static final String HOST = ELASTIC_PREFIX + "host";
+  public static final String PORT = ELASTIC_PREFIX + "port";
+  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+  public static final String INDEX = ELASTIC_PREFIX + "index";
+  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + 
"max.bulk.size";
+}
\ No newline at end of file

Added: 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1494496&view=auto
==============================================================================
--- 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
 (added)
+++ 
nutch/trunk/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
 Wed Jun 19 08:31:28 2013
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.nutch.indexwriter.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.IndexWriter;
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ElasticIndexWriter implements IndexWriter {
+  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
+  private static final int DEFAULT_MAX_BULK_DOCS = 250;
+  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+  private Client client;
+  private Node node;
+  private String defaultIndex;
+
+  private Configuration config;
+
+  private BulkRequestBuilder bulk;
+  private ListenableActionFuture<BulkResponse> execute;
+  private int port = -1;
+  private String host = null;
+  private String clusterName = null;
+  private int maxBulkDocs;
+  private int maxBulkLength;
+  private long indexedDocs = 0;
+  private int bulkDocs = 0;
+  private int bulkLength = 0;
+  private boolean createNewBulk = false;
+
+  @Override
+  public void open(JobConf job, String name) throws IOException {
+    clusterName = job.get(ElasticConstants.CLUSTER);
+    host = job.get(ElasticConstants.HOST);
+    port = job.getInt(ElasticConstants.PORT, -1);
+    
+    // Prefer TransportClient
+    if (host != null && port > 1) {
+      Settings settings = 
ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).build();
+      client = new TransportClient(settings).addTransportAddress(new 
InetSocketTransportAddress(host, port));
+    } else if (clusterName != null) {
+      node = nodeBuilder().clusterName(clusterName).client(true).node();
+      client = node.client();
+    }
+
+    bulk = client.prepareBulk();
+    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
+    maxBulkDocs = job.getInt(
+            ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getInt(
+            ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    String id = (String)doc.getFieldValue("url");
+    String type = doc.getDocumentMeta().get("type");
+    if (type == null) type = "doc";
+    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
+    Map<String, Object> source = new HashMap<String, Object>();
+
+    // Loop through all fields of this doc
+    for (String fieldName : doc.getFieldNames()) {
+      if (doc.getField(fieldName).getValues().size() > 1) {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        // Loop through the values to keep track of the size of this document
+        for (Object value : doc.getField(fieldName).getValues()) {
+          bulkLength += value.toString().length();
+        }
+      } else {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        bulkLength += doc.getFieldValue(fieldName).toString().length();
+      }
+    }
+    request.setSource(source);
+
+    // Add this indexing request to a bulk request
+    bulk.add(request);
+    indexedDocs++;
+    bulkDocs++;
+
+    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+              + bulkLength + ", total docs = " + indexedDocs
+              + ", last doc in bulk = '" + id + "']");
+      // Flush the bulk of indexing requests
+      createNewBulk = true;
+      commit();
+    }
+  }
+
+
+  @Override
+  public void delete(String key) throws IOException {
+    try{
+      DeleteRequestBuilder builder =  client.prepareDelete();
+      builder.setId(key);
+      builder.execute().actionGet();
+    }catch(ElasticSearchException e)
+    {
+      throw makeIOException(e);
+    }
+  }
+
+  public static IOException makeIOException(ElasticSearchException e) {
+    final IOException ioe = new IOException();
+    ioe.initCause(e);
+    return ioe;
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    write(doc);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (execute != null) {
+      // wait for previous to finish
+      long beforeWait = System.currentTimeMillis();
+      BulkResponse actionGet = execute.actionGet();
+      if (actionGet.hasFailures()) {
+        for (BulkItemResponse item : actionGet) {
+          if (item.isFailed()) {
+            throw new RuntimeException("First failure in bulk: "
+                    + item.getFailureMessage());
+          }
+        }
+      }
+      long msWaited = System.currentTimeMillis() - beforeWait;
+      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+              + ", including wait " + msWaited);
+      execute = null;
+    }
+    if (bulk != null) {
+      if (bulkDocs > 0) {
+        // start a flush, note that this is an asynchronous call
+        execute = bulk.execute();
+      }
+      bulk = null;
+    }
+    if (createNewBulk) {
+      // Prepare a new bulk request
+      bulk = client.prepareBulk();
+      bulkDocs = 0;
+      bulkLength = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Flush pending requests
+    LOG.info("Processing remaining requests [docs = " + bulkDocs
+            + ", length = " + bulkLength + ", total docs = " + indexedDocs + 
"]");
+    createNewBulk = false;
+    commit();
+    // flush one more time to finalize the last bulk
+    LOG.info("Processing to finalize last execute");
+    createNewBulk = false;
+    commit();
+
+    // Close
+    client.close();
+    if (node != null) {
+      node.close();
+    }
+  }
+
+  @Override
+  public String describe() {
+    StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+    sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix 
cluster\n");
+    sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+    sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
+    sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index 
command \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic 
bulk index doc counts. (default 250) \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : 
elastic bulk index length. (default 2500500 ~2.5MB)\n");
+    return sb.toString();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+    String cluster = conf.get(ElasticConstants.CLUSTER);
+    if (cluster == null) {
+      String message = "Missing elastic.cluster. Should be set in 
nutch-site.xml ";
+      message+="\n"+describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+    
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+} 


Reply via email to