Author: jnioche
Date: Fri May 30 14:55:51 2014
New Revision: 1598622
URL: http://svn.apache.org/r1598622
Log:
NUTCH-1768 Upgrade to ElasticSearch 1.1.0
Modified:
nutch/branches/2.x/CHANGES.txt
nutch/branches/2.x/ivy/ivy.xml
nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Modified: nutch/branches/2.x/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Fri May 30 14:55:51 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Current Development
+* NUTCH-1768 Upgrade to ElasticSearch 1.1.0 (jnioche)
+
* NUTCH-1634 readdb -stats shows the result twice (kaveh minooie via jnioche)
* NUTCH-1780 ttl and gc_grace_seconds attributes are missing from
gora-cassandra-mapping.xml file (kaveh minooie via lewismc)
Modified: nutch/branches/2.x/ivy/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivy.xml (original)
+++ nutch/branches/2.x/ivy/ivy.xml Fri May 30 14:55:51 2014
@@ -32,9 +32,6 @@
</publications>
<dependencies>
- <dependency org="org.elasticsearch" name="elasticsearch" rev="0.19.4"
- conf="*->default"/>
-
<dependency org="org.apache.solr" name="solr-solrj" rev="4.6.0"
conf="*->default" />
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"
Modified: nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml (original)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml Fri May 30 14:55:51
2014
@@ -29,7 +29,7 @@ language governing permissions and limit
<dependencies>
<dependency org="org.elasticsearch" name="elasticsearch"
- rev="0.90.1" conf="*->default" />
+ rev="1.1.0" conf="*->default" />
</dependencies>
</ivy-module>
Modified: nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml (original)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml Fri May 30
14:55:51 2014
@@ -23,22 +23,21 @@
<export name="*" />
</library>
- <library name="elasticsearch-0.90.1.jar"/>
- <library name="jna-3.3.0.jar"/>
- <library name="jts-1.12.jar"/>
- <library name="log4j-1.2.17.jar"/>
- <library name="lucene-codecs-4.3.0.jar"/>
- <library name="lucene-core-4.3.0.jar"/>
- <library name="lucene-grouping-4.3.0.jar"/>
- <library name="lucene-highlighter-4.3.0.jar"/>
- <library name="lucene-join-4.3.0.jar"/>
- <library name="lucene-memory-4.3.0.jar"/>
- <library name="lucene-queries-4.3.0.jar"/>
- <library name="lucene-queryparser-4.3.0.jar"/>
- <library name="lucene-sandbox-4.3.0.jar"/>
- <library name="lucene-spatial-4.3.0.jar"/>
- <library name="lucene-suggest-4.3.0.jar"/>
- <library name="spatial4j-0.3.jar"/>
+ <library name="elasticsearch-1.1.0.jar"/>
+ <library name="lucene-analyzers-common-4.7.0.jar"/>
+ <library name="lucene-codecs-4.7.0.jar"/>
+ <library name="lucene-core-4.7.0.jar"/>
+ <library name="lucene-grouping-4.7.0.jar"/>
+ <library name="lucene-highlighter-4.7.0.jar"/>
+ <library name="lucene-join-4.7.0.jar"/>
+ <library name="lucene-memory-4.7.0.jar"/>
+ <library name="lucene-misc-4.7.0.jar"/>
+ <library name="lucene-queries-4.7.0.jar"/>
+ <library name="lucene-queryparser-4.7.0.jar"/>
+ <library name="lucene-sandbox-4.7.0.jar"/>
+ <library name="lucene-spatial-4.7.0.jar"/>
+ <library name="lucene-suggest-4.7.0.jar"/>
+ <library name="spatial4j-0.4.1.jar"/>
</runtime>
<requires>
Modified:
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
---
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
(original)
+++
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
Fri May 30 14:55:51 2014
@@ -14,36 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.nutch.indexwriter.elastic;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+import java.io.BufferedReader;
import java.io.IOException;
-import java.net.URL;
import java.util.HashMap;
import java.util.Map;
-import java.io.BufferedReader;
-import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.IndexWriter;
-import org.elasticsearch.ElasticSearchException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.slf4j.Logger;
@@ -79,11 +75,13 @@ public class ElasticIndexWriter implemen
public void open(Configuration job) throws IOException {
clusterName = job.get(ElasticConstants.CLUSTER);
host = job.get(ElasticConstants.HOST);
- port = job.getInt(ElasticConstants.PORT, -1);
+ port = job.getInt(ElasticConstants.PORT, 9300);
+
+ Builder settingsBuilder = ImmutableSettings.settingsBuilder().classLoader(
+ Settings.class.getClassLoader());
- Builder settingsBuilder = ImmutableSettings.settingsBuilder();
-
- BufferedReader reader = new
BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+ BufferedReader reader = new BufferedReader(
+ job.getConfResourceAsReader("elasticsearch.conf"));
String line;
String parts[];
@@ -98,12 +96,16 @@ public class ElasticIndexWriter implemen
}
}
+ if (StringUtils.isNotBlank(clusterName))
+ settingsBuilder.put("cluster.name", clusterName);
+
// Set the cluster name and build the settings
- Settings settings = settingsBuilder.put("cluster.name",
clusterName).build();
-
+ Settings settings = settingsBuilder.build();
+
// Prefer TransportClient
if (host != null && port > 1) {
- client = new TransportClient(settings).addTransportAddress(new
InetSocketTransportAddress(host, port));
+ client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress(host, port));
} else if (clusterName != null) {
node = nodeBuilder().settings(settings).client(true).node();
client = node.client();
@@ -111,16 +113,18 @@ public class ElasticIndexWriter implemen
bulk = client.prepareBulk();
defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
- maxBulkDocs = job.getInt(
- ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
- maxBulkLength = job.getInt(
- ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+ maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+ DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+ DEFAULT_MAX_BULK_LENGTH);
}
+
@Override
public void write(NutchDocument doc) throws IOException {
- String id = (String)doc.getFieldValue("url");
+ String id = (String) doc.getFieldValue("url");
String type = doc.getDocumentMeta().get("type");
- if (type == null) type = "doc";
+ if (type == null)
+ type = "doc";
IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
Map<String, Object> source = new HashMap<String, Object>();
@@ -147,30 +151,28 @@ public class ElasticIndexWriter implemen
if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
- + bulkLength + ", total docs = " + indexedDocs
- + ", last doc in bulk = '" + id + "']");
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
// Flush the bulk of indexing requests
createNewBulk = true;
commit();
}
}
-
@Override
public void delete(String key) throws IOException {
- try{
- DeleteRequestBuilder builder = client.prepareDelete();
+ try {
+ DeleteRequestBuilder builder = client.prepareDelete();
builder.setIndex(defaultIndex);
- builder.setType("doc");
+ builder.setType("doc");
builder.setId(key);
builder.execute().actionGet();
- }catch(ElasticSearchException e)
- {
+ } catch (ElasticsearchException e) {
throw makeIOException(e);
}
}
- public static IOException makeIOException(ElasticSearchException e) {
+ public static IOException makeIOException(ElasticsearchException e) {
final IOException ioe = new IOException();
ioe.initCause(e);
return ioe;
@@ -191,13 +193,13 @@ public class ElasticIndexWriter implemen
for (BulkItemResponse item : actionGet) {
if (item.isFailed()) {
throw new RuntimeException("First failure in bulk: "
- + item.getFailureMessage());
+ + item.getFailureMessage());
}
}
}
long msWaited = System.currentTimeMillis() - beforeWait;
LOG.info("Previous took in ms " + actionGet.getTookInMillis()
- + ", including wait " + msWaited);
+ + ", including wait " + msWaited);
execute = null;
}
if (bulk != null) {
@@ -219,7 +221,7 @@ public class ElasticIndexWriter implemen
public void close() throws IOException {
// Flush pending requests
LOG.info("Processing remaining requests [docs = " + bulkDocs
- + ", length = " + bulkLength + ", total docs = " + indexedDocs +
"]");
+ + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
createNewBulk = false;
commit();
// flush one more time to finalize the last bulk
@@ -237,12 +239,17 @@ public class ElasticIndexWriter implemen
@Override
public String describe() {
StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
- sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix
cluster\n");
+ sb.append("\t").append(ElasticConstants.CLUSTER)
+ .append(" : elastic prefix cluster\n");
sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
- sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
- sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index
command \n");
- sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic
bulk index doc counts. (default 250) \n");
- sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" :
elastic bulk index length. (default 2500500 ~2.5MB)\n");
+ sb.append("\t").append(ElasticConstants.PORT)
+ .append(" : port (default 9300)\n");
+ sb.append("\t").append(ElasticConstants.INDEX)
+ .append(" : elastic index command \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
+ .append(" : elastic bulk index doc counts. (default 250) \n");
+ sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
+ .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
return sb.toString();
}
@@ -250,16 +257,18 @@ public class ElasticIndexWriter implemen
public void setConf(Configuration conf) {
config = conf;
String cluster = conf.get(ElasticConstants.CLUSTER);
- if (cluster == null) {
- String message = "Missing elastic.cluster. Should be set in
nutch-site.xml ";
- message+="\n"+describe();
+ String host = conf.get(ElasticConstants.HOST);
+
+ if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+ String message = "Missing elastic.cluster and elastic.host. At least one
of them should be set in nutch-site.xml ";
+ message += "\n" + describe();
LOG.error(message);
throw new RuntimeException(message);
}
}
-
+
@Override
public Configuration getConf() {
return config;
- }
+ }
}