Author: ferdy
Date: Wed Aug 1 14:24:14 2012
New Revision: 1368016
URL: http://svn.apache.org/viewvc?rev=1368016&view=rev
Log:
NUTCH-1445 Add ElasticIndexerJob that indexes to elasticsearch
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
(with props)
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
(with props)
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
(with props)
Modified:
nutch/branches/2.x/CHANGES.txt
nutch/branches/2.x/ivy/ivy.xml
nutch/branches/2.x/ivy/ivysettings.xml
Modified: nutch/branches/2.x/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Wed Aug 1 14:24:14 2012
@@ -1,6 +1,8 @@
Nutch Change Log
Release 2.1 - Current Development
+* NUTCH-1445 Add ElasticIndexerJob that indexes to elasticsearch (ferdy)
+
* NUTCH-1444 Indexing should not create temporary files (do not extend from
FileOutputFormat) (ferdy)
* NUTCH-1443 Solr schema version is invalid (markus)
Modified: nutch/branches/2.x/ivy/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivy.xml (original)
+++ nutch/branches/2.x/ivy/ivy.xml Wed Aug 1 14:24:14 2012
@@ -32,6 +32,9 @@
</publications>
<dependencies>
+ <dependency org="org.elasticsearch" name="elasticsearch" rev="0.19.4"
+ conf="*->default,sources"/>
+
<dependency org="org.apache.solr" name="solr-solrj" rev="3.4.0"
conf="*->default" />
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"
Modified: nutch/branches/2.x/ivy/ivysettings.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivysettings.xml?rev=1368016&r1=1368015&r2=1368016&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivysettings.xml (original)
+++ nutch/branches/2.x/ivy/ivysettings.xml Wed Aug 1 14:24:14 2012
@@ -29,6 +29,9 @@
http://ibiblio.lsu.edu/main/pub/packages/maven2
http://www.ibiblio.net/pub/packages/maven2
-->
+ <property name="oss.sonatype.org"
+ value="http://oss.sonatype.org/content/repositories/releases/"
+ override="false"/>
<property name="repo.maven.org"
value="http://repo1.maven.org/maven2/"
override="false"/>
@@ -58,19 +61,28 @@
pattern="${maven2.pattern.ext}"
m2compatible="true"
/>
+ <ibiblio name="sonatype"
+ root="${oss.sonatype.org}"
+ pattern="${maven2.pattern.ext}"
+ m2compatible="true"
+ />
+
<chain name="default" dual="true">
<resolver ref="local"/>
<resolver ref="maven2"/>
+ <resolver ref="sonatype"/>
</chain>
<chain name="internal">
<resolver ref="local"/>
</chain>
<chain name="external">
<resolver ref="maven2"/>
+ <resolver ref="sonatype"/>
</chain>
<chain name="external-and-snapshots">
<resolver ref="maven2"/>
<resolver ref="apache-snapshot"/>
+ <resolver ref="sonatype"/>
</chain>
<chain name="restletchain">
<resolver ref="restlet"/>
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java?rev=1368016&view=auto
==============================================================================
---
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
(added)
+++
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
Wed Aug 1 14:24:14 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+public interface ElasticConstants {
+ public static final String ELASTIC_PREFIX = "elastic.";
+
+ public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
+ public static final String INDEX = ELASTIC_PREFIX + "index";
+ public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
+ public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX +
"max.bulk.size";
+}
Propchange:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticConstants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java?rev=1368016&view=auto
==============================================================================
---
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
(added)
+++
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
Wed Aug 1 14:24:14 2012
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+import java.util.Map;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.indexer.IndexerJob;
+import org.apache.nutch.indexer.NutchIndexWriterFactory;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.ToolUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Indexer for elasticsearch. Uses bulk operations with flushing in the
background,
+ * keeping track of elasticsearch responses by checking after every flush.
When a
+ * previous flush has not finished yet before the next bulk is full, it will
wait for it.
+ * This mechanism will keep the servers from overloading.
+ */
+public class ElasticIndexerJob extends IndexerJob {
+
+ public static Logger LOG = LoggerFactory.getLogger(ElasticIndexerJob.class);
+
+ @Override
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
+ LOG.info("Starting");
+
+ NutchIndexWriterFactory.addClassToConf(getConf(), ElasticWriter.class);
+ String batchId = (String)args.get(Nutch.ARG_BATCH);
+ String clusterName = (String)args.get(ElasticConstants.CLUSTER);
+
+ getConf().set(ElasticConstants.CLUSTER, clusterName);
+
+ currentJob = createIndexJob(getConf(), "elastic-index [" + clusterName +
"]", batchId);
+
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+
+ LOG.info("Done");
+ return results;
+ }
+
+ public void indexElastic(String clusterName, String batchId) throws
Exception {
+ run(ToolUtil.toArgMap(ElasticConstants.CLUSTER, clusterName,
+ Nutch.ARG_BATCH, batchId));
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length == 2) {
+ //ok
+ } else if (args.length == 4 && "-crawlId".equals(args[2])) {
+ getConf().set(Nutch.CRAWL_ID_KEY, args[3]);
+ } else {
+ System.err.println("Usage: <elastic cluster name> (<batchId> | -all |
-reindex) [-crawlId <id>]");
+ return -1;
+ }
+ indexElastic(args[0], args[1]);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new
ElasticIndexerJob(), args);
+ System.exit(res);
+ }
+}
Propchange:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticIndexerJob.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java?rev=1368016&view=auto
==============================================================================
---
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
(added)
+++
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
Wed Aug 1 14:24:14 2012
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.elastic;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.util.TableUtil;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticWriter implements NutchIndexWriter {
+
+ public static Logger LOG = LoggerFactory.getLogger(ElasticWriter.class);
+
+ private static final int DEFAULT_MAX_BULK_DOCS = 500;
+ private static final int DEFAULT_MAX_BULK_LENGTH = 5001001; // ~5MB
+
+ private Client client;
+ private Node node;
+ private String defaultIndex;
+
+ private BulkRequestBuilder bulk;
+ private ListenableActionFuture<BulkResponse> execute;
+ private int maxBulkDocs;
+ private int maxBulkLength;
+ private long indexedDocs = 0;
+ private int bulkDocs = 0;
+ private int bulkLength = 0;
+
+ @Override
+ public void write(NutchDocument doc) throws IOException {
+ String id = TableUtil.reverseUrl(doc.getFieldValue("url"));
+ IndexRequestBuilder request = client.prepareIndex(defaultIndex, doc
+ .getDocumentMeta().get("type"), id);
+
+ Map<String, Object> source = new HashMap<String, Object>();
+
+ // Loop through all fields of this doc
+ for (String fieldName : doc.getFieldNames()) {
+ if (doc.getFieldValues(fieldName).size() > 1) {
+ source.put(fieldName, doc.getFieldValues(fieldName));
+ // Loop through the values to keep track of the size of this document
+ for (String value : doc.getFieldValues(fieldName)) {
+ bulkLength += value.length();
+ }
+ } else {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ bulkLength += doc.getFieldValue(fieldName).length();
+ }
+ }
+ request.setSource(source);
+
+ // Add this indexing request to a bulk request
+ bulk.add(request);
+ indexedDocs++;
+ bulkDocs++;
+
+ if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+ LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
+ // Flush the bulk of indexing requests
+ processExecute(true);
+
+ }
+ }
+
+ private void processExecute(boolean createNewBulk) {
+ if (execute != null) {
+ // wait for previous to finish
+ long beforeWait = System.currentTimeMillis();
+ BulkResponse actionGet = execute.actionGet();
+ if (actionGet.hasFailures()) {
+ for (BulkItemResponse item : actionGet) {
+ if (item.failed()) {
+ throw new RuntimeException("First failure in bulk: "
+ + item.getFailureMessage());
+ }
+ }
+ }
+ long msWaited = System.currentTimeMillis() - beforeWait;
+ LOG.info("Previous took in ms " + actionGet.getTookInMillis()
+ + ", including wait " + msWaited);
+ execute = null;
+ }
+ if (bulk != null) {
+ if (bulkDocs > 0) {
+ // start a flush, note that this is an asynchronous call
+ execute = bulk.execute();
+ }
+ bulk = null;
+ }
+ if (createNewBulk) {
+ // Prepare a new bulk request
+ bulk = client.prepareBulk();
+ bulkDocs = 0;
+ bulkLength = 0;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Flush pending requests
+ LOG.info("Processing remaining requests [docs = " + bulkDocs
+ + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
+ // flush one more time to finalize the last bulk
+ LOG.info("Processing to finalize last execute");
+ processExecute(false);
+
+ // Close
+ client.close();
+ node.close();
+ }
+
+ @Override
+ public void open(TaskAttemptContext job) throws IOException {
+ String clusterName = job.getConfiguration().get(ElasticConstants.CLUSTER);
+ if (clusterName != null) {
+ node = nodeBuilder().clusterName(clusterName).client(true).node();
+ } else {
+ node = nodeBuilder().client(true).node();
+ }
+ client = node.client();
+
+ bulk = client.prepareBulk();
+ defaultIndex = job.getConfiguration().get(ElasticConstants.INDEX, "index");
+ maxBulkDocs = job.getConfiguration().getInt(
+ ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getConfiguration().getInt(
+ ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+ }
+
+}
Propchange:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/elastic/ElasticWriter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain