[
https://issues.apache.org/jira/browse/NUTCH-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419770#comment-15419770
]
ASF GitHub Bot commented on NUTCH-2296:
---------------------------------------
Github user lewismc commented on a diff in the pull request:
https://github.com/apache/nutch/pull/139#discussion_r74679117
--- Diff:
src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TODO trust self signed and non matching certs:
http://stackoverflow.com/questions/2893819/telling-java-to-accept-self-signed-ssl-certificate
+//TODO refactor the dependencies out of root ivy file
+
+package org.apache.nutch.indexwriter.elasticrest;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.JestResult;
+import io.searchbox.client.JestResultHandler;
+import io.searchbox.client.config.HttpClientConfig;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.BulkResult;
+import io.searchbox.core.Delete;
+import io.searchbox.core.Index;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.http.HttpResponse;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.NutchDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.net.URL;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+
+/**
+ */
+public class ElasticRestIndexWriter implements IndexWriter {
+ public static Logger LOG =
LoggerFactory.getLogger(ElasticRestIndexWriter.class);
+
+ private static final int DEFAULT_MAX_BULK_DOCS = 250;
+ private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+
+ private JestClient client;
+ private String defaultIndex;
+ private String defaultType = null;
+
+ private Configuration config;
+
+ private Bulk.Builder bulkBuilder;
+ private Future<HttpResponse> execute;
+ private int port = -1;
+ private String host = null;
+ private String user = null;
+ private Boolean https = null;
+ private String password = null;
+ private Boolean trustAllHostnames = null;
+
+ private int maxBulkDocs;
+ private int maxBulkLength;
+ private long indexedDocs = 0;
+ private int bulkDocs = 0;
+ private int bulkLength = 0;
+ private boolean createNewBulk = false;
+ private long millis;
+ private BasicFuture<JestResult> basicFuture = null;
+
+ @Override
+ public void open(JobConf job, String name) throws IOException {
+
+ host = job.get(ElasticRestConstants.HOST);
+ port = job.getInt(ElasticRestConstants.PORT, 9200);
+ user = job.get(ElasticRestConstants.USER);
+ password = job.get(ElasticRestConstants.PASSWORD);
+ https = job.getBoolean(ElasticRestConstants.HTTPS, false);
+ trustAllHostnames =
job.getBoolean(ElasticRestConstants.HOSTNAME_TRUST, false);
+
+ // trust ALL certificates
+ SSLContext sslContext = null;
+ try {
+ sslContext = new SSLContextBuilder().loadTrustMaterial(new
TrustStrategy() {
+ public boolean isTrusted(X509Certificate[] arg0, String
arg1) throws CertificateException {
+ return true;
+ }
+ }).build();
+ } catch (NoSuchAlgorithmException | KeyManagementException |
KeyStoreException e) {
+ e.printStackTrace();
+ }
+
+ // skip hostname checks
+ HostnameVerifier hostnameVerifier = null;
+ if (trustAllHostnames) {
+ hostnameVerifier = new NoopHostnameVerifier();
+ } else {
+ hostnameVerifier = new DefaultHostnameVerifier();
+ }
+
+ SSLConnectionSocketFactory sslSocketFactory = new
SSLConnectionSocketFactory(sslContext);
+ SchemeIOSessionStrategy httpsIOSessionStrategy = new
SSLIOSessionStrategy(sslContext, hostnameVerifier);
+
+ JestClientFactory jestClientFactory = new JestClientFactory();
+ URL urlOfElasticsearchNode = new URL(https ? "https" : "http",
host, port, "");
+
+
+ if (host != null && port > 1) {
+ HttpClientConfig.Builder builder = new
HttpClientConfig.Builder(urlOfElasticsearchNode.toString())
+ .multiThreaded(true)
+ .connTimeout(300000)
+ .readTimeout(300000);
+ if (https) {
+ if (user != null && password != null) {
+ builder.defaultCredentials(user, password);
+ }
+ builder.defaultSchemeForDiscoveredNodes("https")
+ .sslSocketFactory(sslSocketFactory) // this only
affects sync calls
+ .httpsIOSessionStrategy(httpsIOSessionStrategy);
// this only affects async calls
+
+ }
+ jestClientFactory.setHttpClientConfig(builder.build());
+ } else {
+ throw new RuntimeException("No host and port specified");
+ }
+
+ client = jestClientFactory.getObject();
+
+ defaultIndex = job.get(ElasticRestConstants.INDEX, "nutch");
+ defaultType = job.get(ElasticRestConstants.TYPE, "doc");
+
+ maxBulkDocs = job.getInt(ElasticRestConstants.MAX_BULK_DOCS,
+ DEFAULT_MAX_BULK_DOCS);
+ maxBulkLength = job.getInt(ElasticRestConstants.MAX_BULK_LENGTH,
+ DEFAULT_MAX_BULK_LENGTH);
+
+ bulkBuilder = new Bulk.Builder()
+ .defaultIndex(defaultIndex)
+ .defaultType(defaultType);
+
+ }
+
+ @Override
+ public void write(NutchDocument doc) throws IOException {
+ String id = (String) doc.getFieldValue("id");
+ String type = doc.getDocumentMeta().get("type");
+ if (type == null) {
+ type = defaultType;
+ }
+
+ Map<String, Object> source = new HashMap<String, Object>();
+
+ // Loop through all fields of this doc
+ for (String fieldName : doc.getFieldNames()) {
+ if (doc.getField(fieldName).getValues().size() > 1) {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ // Loop through the values to keep track of the size of
this
+ // document
+ for (Object value : doc.getField(fieldName).getValues()) {
+ bulkLength += value.toString().length();
+ }
+ } else {
+ source.put(fieldName, doc.getFieldValue(fieldName));
+ bulkLength +=
doc.getFieldValue(fieldName).toString().length();
+ }
+ }
+ Index indexRequest = new Index.Builder(source)
+ .index(defaultIndex)
+ .type(type)
+ .id(id)
+ .build();
+
+ // Add this indexing request to a bulk request
+ bulkBuilder.addAction(indexRequest);
+
+ indexedDocs++;
+ bulkDocs++;
+
+ if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
+ LOG.info("Processing bulk request [docs = " + bulkDocs + ",
length = "
+ + bulkLength + ", total docs = " + indexedDocs
+ + ", last doc in bulk = '" + id + "']");
+ // Flush the bulk of indexing requests
+ createNewBulk = true;
+ commit();
+ }
+ }
+
+ @Override
+ public void delete(String key) throws IOException {
+ try {
+ client.execute(new Delete.Builder(key)
+ .index(defaultIndex)
+ .type(defaultType)
+ .build());
+ } catch (IOException e) {
+ LOG.error(ExceptionUtils.getStackTrace(e));
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void update(NutchDocument doc) throws IOException {
+ try {
+ write(doc);
+ } catch (IOException e) {
+ LOG.error(ExceptionUtils.getStackTrace(e));
+ throw e;
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (basicFuture != null) {
+ // wait for previous to finish
+ long beforeWait = System.currentTimeMillis();
+ try {
+ JestResult result = basicFuture.get();
+ if (result == null) {
+ throw new RuntimeException();
+ }
+ long msWaited = System.currentTimeMillis() - beforeWait;
+ LOG.info("Previous took in ms " + millis
--- End diff --
Same here, please use parameterized logging
> Elasticsearch Indexing Over Rest
> --------------------------------
>
> Key: NUTCH-2296
> URL: https://issues.apache.org/jira/browse/NUTCH-2296
> Project: Nutch
> Issue Type: Improvement
> Components: indexer
> Reporter: Brian Zhao
> Priority: Minor
>
> Open Elasticsearch to the option of REST-based indexing, via another indexing
> plugin implemeted using Jest, potentially allowing the use of https.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)