Author: ferdy
Date: Wed Aug  1 14:24:14 2012
New Revision: 1368016

URL: http://svn.apache.org/viewvc?rev=1368016&view=rev
Log:
NUTCH-1445 Add ElasticIndexerJob that indexes to elasticsearch

Added:
    nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/
    
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
   (with props)
    
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
   (with props)
    
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java 
  (with props)
Modified:
    nutch/branches/2.x/CHANGES.txt
    nutch/branches/2.x/ivy/ivy.xml
    nutch/branches/2.x/ivy/ivysettings.xml

Modified: nutch/branches/2.x/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Wed Aug  1 14:24:14 2012
@@ -1,6 +1,8 @@
 Nutch Change Log
 
 Release 2.1 - Current Development
+* NUTCH-1445 Add ElasticIndexerJob that indexes to elasticsearch (ferdy)
+
 * NUTCH-1444 Indexing should not create temporary files (do not extend from 
FileOutputFormat) (ferdy)
 
 * NUTCH-1443 Solr schema version is invalid (markus)

Modified: nutch/branches/2.x/ivy/ivy.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivy.xml (original)
+++ nutch/branches/2.x/ivy/ivy.xml Wed Aug  1 14:24:14 2012
@@ -32,6 +32,9 @@
   </publications>
 
   <dependencies>
+    <dependency org="org.elasticsearch" name="elasticsearch" rev="0.19.4" 
+                conf="*->default,sources"/>
+  
     <dependency org="org.apache.solr" name="solr-solrj" rev="3.4.0"
       conf="*->default" />
     <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"

Modified: nutch/branches/2.x/ivy/ivysettings.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivysettings.xml?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivysettings.xml (original)
+++ nutch/branches/2.x/ivy/ivysettings.xml Wed Aug  1 14:24:14 2012
@@ -29,6 +29,9 @@
           http://ibiblio.lsu.edu/main/pub/packages/maven2
           http://www.ibiblio.net/pub/packages/maven2
   -->
+  <property name="oss.sonatype.org" 
+    value="http://oss.sonatype.org/content/repositories/releases/"; 
+    override="false"/>
   <property name="repo.maven.org"
     value="http://repo1.maven.org/maven2/";
     override="false"/>
@@ -58,19 +61,28 @@
       pattern="${maven2.pattern.ext}"
       m2compatible="true"
       />
+     <ibiblio name="sonatype"
+      root="${oss.sonatype.org}"
+      pattern="${maven2.pattern.ext}"
+      m2compatible="true"
+      />
+     
     <chain name="default" dual="true">
       <resolver ref="local"/>
       <resolver ref="maven2"/>
+      <resolver ref="sonatype"/>
     </chain>
     <chain name="internal">
       <resolver ref="local"/>
     </chain>
     <chain name="external">
       <resolver ref="maven2"/>
+      <resolver ref="sonatype"/>
     </chain>
     <chain name="external-and-snapshots">
       <resolver ref="maven2"/>
       <resolver ref="apache-snapshot"/>
+      <resolver ref="sonatype"/>
     </chain>
     <chain name="restletchain">
       <resolver ref="restlet"/>

Added: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java?rev=1368016&view=auto
==============================================================================
--- 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
 (added)
+++ 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
 Wed Aug  1 14:24:14 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+public interface ElasticConstants {
+  public static final String ELASTIC_PREFIX = "elastic.";
+
+  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+  public static final String INDEX = ELASTIC_PREFIX + "index";
+  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + 
"max.bulk.size";
+}

Propchange: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java?rev=1368016&view=auto
==============================================================================
--- 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
 (added)
+++ 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
 Wed Aug  1 14:24:14 2012
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+import java.util.Map;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.indexer.IndexerJob;
+import org.apache.nutch.indexer.NutchIndexWriterFactory;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.ToolUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Indexer for elasticsearch. Uses bulk operations with flushing in the 
background,
+ * keeping track of elasticsearch responses by checking after every flush. 
When a 
+ * previous flush has not finished yet before the next bulk is full, it will 
wait for it.
+ * This mechanism will keep the servers from overloading.
+ */
+public class ElasticIndexerJob extends IndexerJob {
+
+  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexerJob.class);
+
+  @Override
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
+    LOG.info("Starting");
+    
+    NutchIndexWriterFactory.addClassToConf(getConf(), ElasticWriter.class);
+    String batchId = (String)args.get(Nutch.ARG_BATCH);    
+    String clusterName = (String)args.get(ElasticConstants.CLUSTER);
+    
+    getConf().set(ElasticConstants.CLUSTER, clusterName);
+    
+    currentJob = createIndexJob(getConf(), "elastic-index [" + clusterName + 
"]", batchId);
+    
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    
+    LOG.info("Done");
+    return results;
+  }
+
+  public void indexElastic(String clusterName, String batchId) throws 
Exception {
+    run(ToolUtil.toArgMap(ElasticConstants.CLUSTER, clusterName,
+                          Nutch.ARG_BATCH, batchId));
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length == 2) {
+      //ok
+    } else if (args.length == 4 && "-crawlId".equals(args[2])) {
+      getConf().set(Nutch.CRAWL_ID_KEY, args[3]);
+    } else {
+      System.err.println("Usage: <elastic cluster name> (<batchId> | -all | 
-reindex) [-crawlId <id>]");
+      return -1;      
+    }
+    indexElastic(args[0], args[1]);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new 
ElasticIndexerJob(), args);
+    System.exit(res);
+  }
+}

Propchange: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java?rev=1368016&view=auto
==============================================================================
--- 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java 
(added)
+++ 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java 
Wed Aug  1 14:24:14 2012
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.util.TableUtil;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticWriter implements NutchIndexWriter {
+
+  public static Logger LOG = LoggerFactory.getLogger(ElasticWriter.class);
+
+  private static final int DEFAULT_MAX_BULK_DOCS = 500;
+  private static final int DEFAULT_MAX_BULK_LENGTH = 5001001; // ~5MB
+
+  private Client client;
+  private Node node;
+  private String defaultIndex;
+
+  private BulkRequestBuilder bulk;
+  private ListenableActionFuture<BulkResponse> execute;
+  private int maxBulkDocs;
+  private int maxBulkLength;
+  private long indexedDocs = 0;
+  private int bulkDocs = 0;
+  private int bulkLength = 0;
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    String id = TableUtil.reverseUrl(doc.getFieldValue("url"));
+    IndexRequestBuilder request = client.prepareIndex(defaultIndex, doc
+        .getDocumentMeta().get("type"), id);
+    
+    Map<String, Object> source = new HashMap<String, Object>();
+    
+    // Loop through all fields of this doc
+    for (String fieldName : doc.getFieldNames()) {
+      if (doc.getFieldValues(fieldName).size() > 1) {
+        source.put(fieldName, doc.getFieldValues(fieldName));
+        // Loop through the values to keep track of the size of this document
+        for (String value : doc.getFieldValues(fieldName)) {
+          bulkLength += value.length();
+        }
+      } else {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        bulkLength += doc.getFieldValue(fieldName).length();
+      }
+    }
+    request.setSource(source);
+    
+    // Add this indexing request to a bulk request
+    bulk.add(request);
+    indexedDocs++;
+    bulkDocs++;
+    
+    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+          + bulkLength + ", total docs = " + indexedDocs
+          + ", last doc in bulk = '" + id + "']");
+      // Flush the bulk of indexing requests
+      processExecute(true);
+      
+    }
+  }
+
+  private void processExecute(boolean createNewBulk) {
+    if (execute != null) {
+      // wait for previous to finish
+      long beforeWait = System.currentTimeMillis();
+      BulkResponse actionGet = execute.actionGet();
+      if (actionGet.hasFailures()) {
+        for (BulkItemResponse item : actionGet) {
+          if (item.failed()) {
+            throw new RuntimeException("First failure in bulk: "
+                + item.getFailureMessage());
+          }
+        }
+      }
+      long msWaited = System.currentTimeMillis() - beforeWait;
+      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+          + ", including wait " + msWaited);
+      execute = null;
+    }
+    if (bulk != null) {
+      if (bulkDocs > 0) {
+        // start a flush, note that this is an asynchronous call
+        execute = bulk.execute();
+      }
+      bulk = null;
+    }
+    if (createNewBulk) {
+      // Prepare a new bulk request
+      bulk = client.prepareBulk();
+      bulkDocs = 0;
+      bulkLength = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Flush pending requests
+    LOG.info("Processing remaining requests [docs = " + bulkDocs
+        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+    // flush one more time to finalize the last bulk
+    LOG.info("Processing to finalize last execute");
+    processExecute(false);
+    
+    // Close
+    client.close();
+    node.close();
+  }
+
+  @Override
+  public void open(TaskAttemptContext job) throws IOException {
+    String clusterName = job.getConfiguration().get(ElasticConstants.CLUSTER);
+    if (clusterName != null) {
+      node = nodeBuilder().clusterName(clusterName).client(true).node();
+    } else {
+      node = nodeBuilder().client(true).node();
+    }
+    client = node.client();
+    
+    bulk = client.prepareBulk();
+    defaultIndex = job.getConfiguration().get(ElasticConstants.INDEX, "index");
+    maxBulkDocs = job.getConfiguration().getInt(
+        ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getConfiguration().getInt(
+        ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+  }
+
+}

Propchange: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to