Repository: nutch
Updated Branches:
  refs/heads/2.x d868f06cf -> 9e7c0e6fa


fix for NUTCH-2238 contributed by ptorrestr


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/7e43de60
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/7e43de60
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/7e43de60

Branch: refs/heads/2.x
Commit: 7e43de60bbace397aecabb7c0b960555aac313a8
Parents: 876aa4f
Author: Pablo Torres <[email protected]>
Authored: Thu Mar 3 13:42:50 2016 +0000
Committer: Pablo Torres <[email protected]>
Committed: Thu Mar 3 13:42:50 2016 +0000

----------------------------------------------------------------------
 src/plugin/build.xml                            |   2 +
 src/plugin/indexer-elastic2/build-ivy.xml       |  54 ++++
 src/plugin/indexer-elastic2/build.xml           |  31 +++
 .../indexer-elastic2/howto_upgrade_es.txt       |   6 +
 src/plugin/indexer-elastic2/ivy.xml             |  36 +++
 src/plugin/indexer-elastic2/plugin.xml          |  71 +++++
 .../indexwriter/elastic2/ElasticConstants.java  |  28 ++
 .../elastic2/ElasticIndexWriter.java            | 273 +++++++++++++++++++
 .../indexwriter/elastic2/package-info.java      |  22 ++
 9 files changed, 523 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 73386a9..47f4714 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -34,6 +34,7 @@
      <ant dir="index-metadata" target="deploy"/>
      <ant dir="indexer-solr" target="deploy"/>
      <ant dir="indexer-elastic" target="deploy"/>
+     <ant dir="indexer-elastic2" target="deploy"/>
      <ant dir="language-identifier" target="deploy"/>
      <ant dir="lib-http" target="deploy"/>
      <ant dir="lib-nekohtml" target="deploy"/>
@@ -122,6 +123,7 @@
     <ant dir="index-metadata" target="clean"/>
     <ant dir="indexer-solr" target="clean"/>
     <ant dir="indexer-elastic" target="clean"/>
+    <ant dir="indexer-elastic2" target="clean"/>
     <ant dir="language-identifier" target="clean"/>
     <ant dir="lib-http" target="clean"/>
     <ant dir="lib-nekohtml" target="clean"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/build-ivy.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic2/build-ivy.xml 
b/src/plugin/indexer-elastic2/build-ivy.xml
new file mode 100644
index 0000000..8dc7fdb
--- /dev/null
+++ b/src/plugin/indexer-elastic2/build-ivy.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic2" default="deps-jar" 
xmlns:ivy="antlib:org.apache.ivy.ant">
+
+    <property name="ivy.install.version" value="2.1.0" />
+    <condition property="ivy.home" value="${env.IVY_HOME}">
+      <isset property="env.IVY_HOME" />
+    </condition>
+    <property name="ivy.home" value="${user.home}/.ant" />
+    <property name="ivy.checksums" value="" />
+    <property name="ivy.jar.dir" value="${ivy.home}/lib" />
+    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar" />
+
+    <target name="download-ivy" unless="offline">
+
+        <mkdir dir="${ivy.jar.dir}"/>
+        <!-- download Ivy from web site so that it can be used even without 
any special installation -->
+        <get 
src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar";
 
+             dest="${ivy.jar.file}" usetimestamp="true"/>
+    </target>
+
+    <target name="init-ivy" depends="download-ivy">
+      <!-- try to load ivy here from ivy home, in case the user has not 
already dropped
+              it into ant's lib dir (note that the latter copy will always 
take precedence).
+              We will not fail as long as local lib dir exists (it may be 
empty) and
+              ivy is in at least one of ant's lib dir or the local lib dir. -->
+        <path id="ivy.lib.path">
+            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
+
+        </path>
+        <taskdef resource="org/apache/ivy/ant/antlib.xml"
+                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
+    </target>
+
+  <target name="deps-jar" depends="init-ivy">
+    <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/>
+  </target>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic2/build.xml 
b/src/plugin/indexer-elastic2/build.xml
new file mode 100644
index 0000000..f0d2785
--- /dev/null
+++ b/src/plugin/indexer-elastic2/build.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-elastic2" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+  <target name="jar" depends="compile">
+    <taskdef name="jarjar" classname="com.tonicsystems.jarjar.JarJarTask" 
classpath="${deploy.dir}/jarjar-1.3.jar"/> 
+    <jarjar jarfile="${build.dir}/${name}.jar" basedir="${build.classes}"> 
+      <!-- Create a uber jar to avoid conflict with guava-14 -->
+      <zipfileset src="${deploy.dir}/elasticsearch-2.2.0.jar"/>
+      <zipfileset src="${deploy.dir}/guava-18.0.jar"/>
+      <rule pattern="com.google.**" result="shade.com.google.@1"/> 
+    </jarjar>
+  </target>
+</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/howto_upgrade_es.txt
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic2/howto_upgrade_es.txt 
b/src/plugin/indexer-elastic2/howto_upgrade_es.txt
new file mode 100644
index 0000000..fd2d108
--- /dev/null
+++ b/src/plugin/indexer-elastic2/howto_upgrade_es.txt
@@ -0,0 +1,6 @@
+1. Upgrade elasticsearch dependency in src/plugin/indexer-elastic/ivy.xml
+
+2. Upgrade the Elasticsearch specific dependencies in 
src/plugin/indexer-elastic/plugin.xml
+   To get the list of dependencies and their versions execute:
+   $ ant -f ./build-ivy.xml
+   $ ls lib | sed 's/^/      <library name="/g' | sed 's/$/"\/>/g'

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/ivy.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic2/ivy.xml 
b/src/plugin/indexer-elastic2/ivy.xml
new file mode 100644
index 0000000..a37d6fe
--- /dev/null
+++ b/src/plugin/indexer-elastic2/ivy.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" ?>
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+license agreements. See the NOTICE file distributed with this work for 
additional 
+information regarding copyright ownership. The ASF licenses this file to 
+You under the Apache License, Version 2.0 (the "License"); you may not use 
+this file except in compliance with the License. You may obtain a copy of 
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+by applicable law or agreed to in writing, software distributed under the 
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+OF ANY KIND, either express or implied. See the License for the specific 
+language governing permissions and limitations under the License. -->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0" />
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"; />
+    <description>Apache Nutch</description>
+  </info>
+
+  <configurations>
+    <include file="../../..//ivy/ivy-configurations.xml" />
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name -->
+    <artifact conf="master" />
+  </publications>
+
+  <dependencies>
+    <dependency org="org.elasticsearch" name="elasticsearch" rev="2.2.0" 
conf="*->default" />
+    <!--create a uber jar-->
+    <dependency org="com.googlecode.jarjar" name="jarjar" rev="1.3"/>
+  </dependencies>
+
+</ivy-module>

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/plugin.xml
----------------------------------------------------------------------
diff --git a/src/plugin/indexer-elastic2/plugin.xml 
b/src/plugin/indexer-elastic2/plugin.xml
new file mode 100644
index 0000000..cfc4fe6
--- /dev/null
+++ b/src/plugin/indexer-elastic2/plugin.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="indexer-elastic2" name="ElasticIndexWriter" version="1.0.0"
+  provider-name="nutch.apache.org">
+
+  <runtime>
+    <library name="indexer-elastic2.jar">
+      <export name="*" />
+    </library>
+
+    <library name="HdrHistogram-2.1.6.jar"/>
+    <library name="commons-cli-1.3.1.jar"/>
+    <library name="compress-lzf-1.0.2.jar"/>
+    <library name="elasticsearch-2.2.0.jar"/>
+    <library name="guava-18.0.jar"/>
+    <library name="hppc-0.7.1.jar"/>
+    <library name="jackson-core-2.6.2.jar"/>
+    <library name="jackson-dataformat-cbor-2.6.2.jar"/>
+    <library name="jackson-dataformat-smile-2.6.2.jar"/>
+    <library name="jackson-dataformat-yaml-2.6.2.jar"/>
+    <library name="joda-convert-1.2.jar"/>
+    <library name="joda-time-2.8.2.jar"/>
+    <library name="jsr166e-1.1.0.jar"/>
+    <library name="lucene-analyzers-common-5.4.1.jar"/>
+    <library name="lucene-backward-codecs-5.4.1.jar"/>
+    <library name="lucene-core-5.4.1.jar"/>
+    <library name="lucene-grouping-5.4.1.jar"/>
+    <library name="lucene-highlighter-5.4.1.jar"/>
+    <library name="lucene-join-5.4.1.jar"/>
+    <library name="lucene-memory-5.4.1.jar"/>
+    <library name="lucene-misc-5.4.1.jar"/>
+    <library name="lucene-queries-5.4.1.jar"/>
+    <library name="lucene-queryparser-5.4.1.jar"/>
+    <library name="lucene-sandbox-5.4.1.jar"/>
+    <library name="lucene-spatial-5.4.1.jar"/>
+    <library name="lucene-spatial3d-5.4.1.jar"/>
+    <library name="lucene-suggest-5.4.1.jar"/>
+    <library name="netty-3.10.5.Final.jar"/>
+    <library name="securesm-1.0.jar"/>
+    <library name="snakeyaml-1.15.jar"/>
+    <library name="spatial4j-0.5.jar"/>
+    <library name="t-digest-3.0.jar"/>      
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+
+  <extension id="org.apache.nutch.indexer.elastic2"
+    name="Elasticsearch 2 Index Writer"
+    point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="ElasticIndexWriter"
+      class="org.apache.nutch.indexwriter.elastic2.ElasticIndexWriter" />
+  </extension>
+
+</plugin>

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticConstants.java
----------------------------------------------------------------------
diff --git 
a/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticConstants.java
 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticConstants.java
new file mode 100644
index 0000000..faa78cb
--- /dev/null
+++ 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.elastic2;
+
+public interface ElasticConstants {
+  public static final String ELASTIC_PREFIX = "elastic.";
+
+  public static final String HOST = ELASTIC_PREFIX + "host";
+  public static final String PORT = ELASTIC_PREFIX + "port";
+  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+  public static final String INDEX = ELASTIC_PREFIX + "index";
+  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + 
"max.bulk.size";
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticIndexWriter.java
 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticIndexWriter.java
new file mode 100644
index 0000000..a2dc105
--- /dev/null
+++ 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticIndexWriter.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexwriter.elastic2;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.net.InetAddress;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.NutchDocument;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.Settings.Builder;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ElasticIndexWriter implements IndexWriter {
+  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
+
+  private static final int DEFAULT_MAX_BULK_DOCS = 250;
+  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+  private Client client;
+  private Node node;
+  private String defaultIndex;
+
+  private Configuration config;
+
+  private BulkRequestBuilder bulk;
+  private ListenableActionFuture<BulkResponse> execute;
+  private int port = -1;
+  private String host = null;
+  private String clusterName = null;
+  private int maxBulkDocs;
+  private int maxBulkLength;
+  private long indexedDocs = 0;
+  private int bulkDocs = 0;
+  private int bulkLength = 0;
+  private boolean createNewBulk = false;
+
+  @Override
+  public void open(Configuration job) throws IOException {
+    clusterName = job.get(ElasticConstants.CLUSTER);
+    host = job.get(ElasticConstants.HOST);
+    port = job.getInt(ElasticConstants.PORT, 9300);
+
+    Builder settingsBuilder = Settings.builder();
+
+    BufferedReader reader = new BufferedReader(
+        job.getConfResourceAsReader("elasticsearch.conf"));
+    String line;
+    String parts[];
+
+    while ((line = reader.readLine()) != null) {
+      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+        line.trim();
+        parts = line.split("=");
+
+        if (parts.length == 2) {
+          settingsBuilder.put(parts[0].trim(), parts[1].trim());
+        }
+      }
+    }
+
+    if (StringUtils.isNotBlank(clusterName))
+      settingsBuilder.put("cluster.name", clusterName);
+
+    // Set the cluster name and build the settings
+    Settings settings = settingsBuilder.build();
+
+    // Prefer TransportClient
+    if (host != null && port > 1) {
+      client = TransportClient.builder().settings(settings).build()
+          .addTransportAddress(new 
InetSocketTransportAddress(InetAddress.getByName(host), port));
+    } else if (clusterName != null) {
+      node = nodeBuilder().settings(settings).client(true).node();
+      client = node.client();
+    }
+
+    bulk = client.prepareBulk();
+    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
+    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    String id = (String) doc.getFieldValue("id");
+    String type = doc.getDocumentMeta().get("type");
+    if (type == null)
+      type = "doc";
+    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
+
+    Map<String, Object> source = new HashMap<String, Object>();
+
+    // Loop through all fields of this doc
+    for (String fieldName : doc.getFieldNames()) {
+      if (doc.getFieldValues(fieldName).size() > 1) {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        // Loop through the values to keep track of the size of this document
+        for (Object value : doc.getFieldValues(fieldName)) {
+          bulkLength += value.toString().length();
+        }
+      } else {
+        source.put(fieldName, doc.getFieldValue(fieldName));
+        bulkLength += doc.getFieldValue(fieldName).toString().length();
+      }
+    }
+    request.setSource(source);
+
+    // Add this indexing request to a bulk request
+    bulk.add(request);
+    indexedDocs++;
+    bulkDocs++;
+
+    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+          + bulkLength + ", total docs = " + indexedDocs
+          + ", last doc in bulk = '" + id + "']");
+      // Flush the bulk of indexing requests
+      createNewBulk = true;
+      commit();
+    }
+  }
+
+  @Override
+  public void delete(String key) throws IOException {
+    try {
+      DeleteRequestBuilder builder = client.prepareDelete();
+      builder.setIndex(defaultIndex);
+      builder.setType("doc");
+      builder.setId(key);
+      builder.execute().actionGet();
+    } catch (ElasticsearchException e) {
+      throw makeIOException(e);
+    }
+  }
+
+  public static IOException makeIOException(ElasticsearchException e) {
+    final IOException ioe = new IOException();
+    ioe.initCause(e);
+    return ioe;
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    write(doc);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (execute != null) {
+      // wait for previous to finish
+      long beforeWait = System.currentTimeMillis();
+      BulkResponse actionGet = execute.actionGet();
+      if (actionGet.hasFailures()) {
+        for (BulkItemResponse item : actionGet) {
+          if (item.isFailed()) {
+            throw new RuntimeException("First failure in bulk: "
+                + item.getFailureMessage());
+          }
+        }
+      }
+      long msWaited = System.currentTimeMillis() - beforeWait;
+      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+          + ", including wait " + msWaited);
+      execute = null;
+    }
+    if (bulk != null) {
+      if (bulkDocs > 0) {
+        // start a flush, note that this is an asynchronous call
+        execute = bulk.execute();
+      }
+      bulk = null;
+    }
+    if (createNewBulk) {
+      // Prepare a new bulk request
+      bulk = client.prepareBulk();
+      bulkDocs = 0;
+      bulkLength = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Flush pending requests
+    LOG.info("Processing remaining requests [docs = " + bulkDocs
+        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+    createNewBulk = false;
+    commit();
+    // flush one more time to finalize the last bulk
+    LOG.info("Processing to finalize last execute");
+    createNewBulk = false;
+    commit();
+
+    // Close
+    client.close();
+    if (node != null) {
+      node.close();
+    }
+  }
+
+  @Override
+  public String describe() {
+    StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
+    sb.append("\t").append(ElasticConstants.CLUSTER)
+        .append(" : elastic prefix cluster\n");
+    sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
+    sb.append("\t").append(ElasticConstants.PORT)
+        .append(" : port  (default 9300)\n");
+    sb.append("\t").append(ElasticConstants.INDEX)
+        .append(" : elastic index command \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
+        .append(" : elastic bulk index doc counts. (default 250) \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
+        .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
+    return sb.toString();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+    String cluster = conf.get(ElasticConstants.CLUSTER);
+    String host = conf.get(ElasticConstants.HOST);
+
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+      String message = "Missing elastic.cluster and elastic.host. At least one 
of them should be set in nutch-site.xml ";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/7e43de60/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/package-info.java
----------------------------------------------------------------------
diff --git 
a/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/package-info.java
 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/package-info.java
new file mode 100644
index 0000000..1b12be7
--- /dev/null
+++ 
b/src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Index writer plugin for <a 
href="http://www.elasticsearch.org/";>Elasticsearch</a>.
+ */
+package org.apache.nutch.indexwriter.elastic2;
+

Reply via email to