This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git

commit ca3824fd98290dd7806752decfab6eb9e3b3b569
Author: tallison <talli...@apache.org>
AuthorDate: Fri Feb 24 14:48:55 2023 -0500

    NUTCH-2920 -- first working attempt at migrating ElasticsearchIndexWriter 
to OpenSearch
---
 LICENSE-binary                                     |   1 +
 NOTICE-binary                                      |   4 +
 conf/index-writers.xml.template                    |  27 ++
 src/plugin/build.xml                               |   1 +
 src/plugin/indexer-opensearch-1x/README.md         |  44 +++
 src/plugin/indexer-opensearch-1x/build-ivy.xml     |  47 +++
 src/plugin/indexer-opensearch-1x/build.xml         |  32 ++
 src/plugin/indexer-opensearch-1x/ivy.xml           |  46 +++
 src/plugin/indexer-opensearch-1x/plugin.xml        |  76 ++++
 .../opensearch1x/OpenSearch1xConstants.java        |  38 ++
 .../opensearch1x/OpenSearch1xIndexWriter.java      | 419 +++++++++++++++++++++
 .../indexwriter/opensearch1x/package-info.java     |  22 ++
 12 files changed, 757 insertions(+)

diff --git a/LICENSE-binary b/LICENSE-binary
index d07d0a6a3..8e24a728e 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -505,6 +505,7 @@ org.jetbrains.kotlin:kotlin-stdlib-jdk8
 org.lz4:lz4-java
 org.mapdb:mapdb
 org.netpreserve.commons:webarchive-commons
+org.opensearch.client:opensearch-rest-high-level-client
 org.seleniumhq.selenium:htmlunit-driver
 org.seleniumhq.selenium:selenium-api
 org.seleniumhq.selenium:selenium-chrome-driver
diff --git a/NOTICE-binary b/NOTICE-binary
index 83d65ffaf..1aab2cb41 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -1021,6 +1021,10 @@ mapdb (http://www.mapdb.org)
 webarchive-commons (https://github.com/iipc/webarchive-commons)
 - license: The Apache Software License, Version 2.0
 
+# org.opensearch.client:opensearch-rest-high-level-client
+opensearch-rest-high-level-client (https://opensearch.org/)
+- license: The Apache Software License, Version 2.0
+
 # org.ow2.asm:asm
 asm (http://asm.ow2.io/)
 - license: BSD-3-Clause
diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
index 9f5d7916c..221f5affe 100644
--- a/conf/index-writers.xml.template
+++ b/conf/index-writers.xml.template
@@ -128,6 +128,33 @@
       <remove />
     </mapping>
   </writer>
+  <writer id="indexer_opensearch_1x_1" 
class="org.apache.nutch.indexwriter.opensearch1x.OpenSearch1xIndexWriter">
+    <parameters>
+      <param name="host" value="localhost"/>
+      <param name="port" value="9200"/>
+      <param name="scheme" value="https"/><!-- http or https -->
+      <param name="index" value="nutch"/>
+      <param name="username" value="admin"/>
+      <param name="password" value="admin"/>
+      <!--<param name="auth" value="false"/>-->
+      <param name="trust.store.path" value=""/>
+      <param name="trust.store.password" value=""/>
+      <param name="trust.store.type" value="JKS"/>
+      <param name="max.bulk.docs" value="250"/>
+      <param name="max.bulk.size" value="2500500"/>
+      <param name="exponential.backoff.millis" value="100"/>
+      <param name="exponential.backoff.retries" value="10"/>
+      <param name="bulk.close.timeout" value="600"/>
+      <!--<param name="options" value="key1=value1,key2=value2"/>-->
+    </parameters>
+    <mapping>
+      <copy>
+        <field source="title" dest="title,search"/>
+      </copy>
+      <rename />
+      <remove />
+    </mapping>
+  </writer>
   <writer id="indexer_cloud_search_1" 
class="org.apache.nutch.indexwriter.cloudsearch.CloudSearchIndexWriter">
     <parameters>
       <param name="endpoint" value=""/>
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index db7d4d560..4d900c390 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -54,6 +54,7 @@
     <ant dir="indexer-dummy" target="deploy"/>
     <ant dir="indexer-elastic" target="deploy"/>
     <ant dir="indexer-kafka" target="deploy"/>
+    <ant dir="indexer-opensearch-1x" target="deploy"/>
     <ant dir="indexer-rabbit" target="deploy"/>
     <ant dir="indexer-solr" target="deploy"/>
     <ant dir="language-identifier" target="deploy"/>
diff --git a/src/plugin/indexer-opensearch-1x/README.md 
b/src/plugin/indexer-opensearch-1x/README.md
new file mode 100644
index 000000000..b68557fae
--- /dev/null
+++ b/src/plugin/indexer-opensearch-1x/README.md
@@ -0,0 +1,44 @@
+indexer-opensearch1x plugin for Nutch 
+================================
+
+**indexer-opensearch1x plugin** is used for sending documents from one or more 
segments to an OpenSearch server. The configuration for the index writers is on 
**conf/index-writers.xml** file, included in the official Nutch distribution 
and it's as follow:
+
+```xml
+<writer id="<writer_id>" 
class="org.apache.nutch.indexwriter.opensearch1x.OpenSearch1xIndexWriter">
+  <mapping>
+    ...
+  </mapping>
+  <parameters>
+    ...
+  </parameters>   
+</writer>
+```
+
+Each `<writer>` element has two mandatory attributes:
+
+* `<writer_id>` is a unique identification for each configuration. This 
feature allows Nutch to distinguish each configuration, even when they are for 
the same index writer. In addition, it allows to have multiple instances for 
the same index writer, but with different configurations.
+
+* `org.apache.nutch.indexwriter.opensearch1x.OpenSearch1x.IndexWriter` 
corresponds to the canonical name of the class that implements the IndexWriter 
extension point. This value should not be modified for the 
**indexer-opensearch1x plugin**.
+
+## Mapping
+
+The mapping section is explained 
[here](https://cwiki.apache.org/confluence/display/NUTCH/IndexWriters#IndexWriters-Mappingsection).
 The structure of this section is general for all index writers.
+
+## Parameters
+
+Each parameter has the form `<param name="<name>" value="<value>"/>` and the 
parameters for this index writer are:
+
+Parameter Name | Description | Default value
+--|--|--
+host | Comma-separated list of hostnames to send documents to using 
[TransportClient](https://static.javadoc.io/org.opensearch/opensearch/1.3.8/org/opensearch/client/transport/TransportClient.html).
 Either host and port must be defined. | 
+port | The port to connect to using 
[TransportClient](https://static.javadoc.io/org.opensearch/opensearch/1.3.8/org/opensearch/client/transport/TransportClient.html).
 | 9300
+scheme | The scheme (http or https) to connect to OpenSearch server. | https
+index | Default index to send documents to. | nutch
+username | Username for auth credentials | admin
+password | Password for auth credentials | admin
+auth | Whether to enable HTTP basic authentication with OpenSearch. Use 
`username` and `password` properties to configure your credentials. | false
+max.bulk.docs | Maximum size of the bulk in number of documents. | 250
+max.bulk.size | Maximum size of the bulk in bytes. | 2500500
+exponential.backoff.millis | Initial delay for the 
[BulkProcessor](https://static.javadoc.io/org.opensearch/opensearch/1.3.8/org/opensearch/action/bulk/BulkProcessor.html)
 exponential backoff policy. | 100
+exponential.backoff.retries | Number of times the 
[BulkProcessor](https://static.javadoc.io/org.opensearch/opensearch/1.3.8/org/opensearch/action/bulk/BulkProcessor.html)
 exponential backoff policy should retry bulk operations. | 10
+bulk.close.timeout | Number of seconds allowed for the 
[BulkProcessor](https://static.javadoc.io/org.opensearch/opensearch/1.3.8/org/opensearch/action/bulk/BulkProcessor.html)
 to complete its last operation. | 600
diff --git a/src/plugin/indexer-opensearch-1x/build-ivy.xml 
b/src/plugin/indexer-opensearch-1x/build-ivy.xml
new file mode 100644
index 000000000..600f80a8b
--- /dev/null
+++ b/src/plugin/indexer-opensearch-1x/build-ivy.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-opensearch-1x" default="deps-jar" 
xmlns:ivy="antlib:org.apache.ivy.ant">
+
+  <property name="ivy.dir" value="../../../ivy" />
+  <property file="../../../default.properties" />
+
+  <target name="download-ivy" unless="offline">
+    <!-- download Ivy from web site so that it can be used even without any 
special installation -->
+    <available file="${ivy.jar}" property="ivy.jar.found"/>
+    <antcall target="ivy-download-unchecked"/>
+  </target>
+
+  <target name="ivy-download-unchecked" unless="ivy.jar.found" 
description="--> fetch any ivy file">
+    <get src="${ivy.repo.url}" dest="${ivy.jar}" usetimestamp="true" />
+  </target>
+
+  <target name="init-ivy" depends="download-ivy">
+    <!-- try to load ivy here from ivy home, in case the user has not already 
dropped
+         it into ant's lib dir (note that the latter copy will always take 
precedence).
+         We will not fail as long as local lib dir exists (it may be empty) and
+         ivy is in at least one of ant's lib dir or the local lib dir. -->
+    <taskdef resource="org/apache/ivy/ant/antlib.xml"
+             uri="antlib:org.apache.ivy.ant" classpath="${ivy.jar}"/>
+    <ivy:settings file="${ivy.dir}/ivysettings.xml" />
+  </target>
+
+  <target name="deps-jar" depends="init-ivy">
+    <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]" sync="true"/>
+  </target>
+
+</project>
diff --git a/src/plugin/indexer-opensearch-1x/build.xml 
b/src/plugin/indexer-opensearch-1x/build.xml
new file mode 100644
index 000000000..feab0e147
--- /dev/null
+++ b/src/plugin/indexer-opensearch-1x/build.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="indexer-opensearch-1x" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+  <!-- Add compilation dependencies to classpath -->
+  <path id="plugin.deps">
+    <pathelement location="${build.dir}/test/conf"/>
+  </path>
+
+  <!-- Deploy Unit test dependencies -->
+  <target name="deps-test">
+  </target>
+
+
+</project>
diff --git a/src/plugin/indexer-opensearch-1x/ivy.xml 
b/src/plugin/indexer-opensearch-1x/ivy.xml
new file mode 100644
index 000000000..1505ad3c8
--- /dev/null
+++ b/src/plugin/indexer-opensearch-1x/ivy.xml
@@ -0,0 +1,46 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!-- 
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0" />
+    <ivyauthor name="Apache Nutch Team" url="https://nutch.apache.org/"; />
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../../ivy/ivy-configurations.xml" />
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master" />
+  </publications>
+
+  <dependencies>
+    <dependency org="org.opensearch.client" 
name="opensearch-rest-high-level-client" rev="1.3.8">
+      <!-- exclusions of dependencies provided in Nutch core (ivy/ivy.xml) -->
+      <exclude org="commons-codec" name="commons-codec" />
+      <exclude org="commons-logging" name="commons-logging" />
+      <exclude org="com.tdunning" name="t-digest" />
+      <exclude org="org.apache.logging.log4j" name="log4j-api" />
+    </dependency>
+  </dependencies>
+  
+</ivy-module>
diff --git a/src/plugin/indexer-opensearch-1x/plugin.xml 
b/src/plugin/indexer-opensearch-1x/plugin.xml
new file mode 100644
index 000000000..1bf5affc2
--- /dev/null
+++ b/src/plugin/indexer-opensearch-1x/plugin.xml
@@ -0,0 +1,76 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!-- 
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<plugin id="indexer-opensearch-1x" name="OpenSearch1xIndexWriter" 
version="1.0.0" provider-name="nutch.apache.org">
+  <runtime>
+    <library name="indexer-opensearch-1x.jar">
+      <export name="*" />
+    </library>
+    <!-- OpenSearch Rest Client Dependencies -->
+    <!-- end of OpenSearch Rest Client dependencies -->
+    <library name="HdrHistogram-2.1.9.jar" />
+    <library name="aggs-matrix-stats-client-1.3.8.jar" />
+    <library name="compiler-0.9.10.jar" />
+    <library name="opensearch-1.3.8.jar" />
+    <library name="opensearch-cli-1.3.8.jar" />
+    <library name="opensearch-core-1.3.8.jar" />
+    <library name="opensearch-geo-1.3.8.jar" />
+    <library name="opensearch-plugin-classloader-1.3.8.jar" />
+    <library name="opensearch-rest-client-1.3.8.jar" />
+    <library name="opensearch-rest-high-level-client-1.3.8.jar" />
+    <library name="opensearch-secure-sm-1.3.8.jar" />
+    <library name="opensearch-x-content-1.3.8.jar" />
+    <library name="hppc-0.8.1.jar" />
+    <library name="httpasyncclient-4.1.4.jar" />
+    <library name="httpclient-4.5.13.jar" />
+    <library name="httpcore-4.4.13.jar" />
+    <library name="httpcore-nio-4.4.14.jar" />
+    <library name="jackson-core-2.12.7.jar" />
+    <library name="jackson-dataformat-cbor-2.12.7.jar" />
+    <library name="jackson-dataformat-smile-2.12.7.jar" />
+    <library name="jackson-dataformat-yaml-2.12.7.jar" />
+    <library name="jna-5.5.0.jar" />
+    <library name="joda-time-2.10.12.jar" />
+    <library name="jopt-simple-5.0.4.jar" />
+    <library name="lang-mustache-client-1.3.8.jar" />
+    <library name="log4j-api-2.17.1.jar" />
+    <library name="lucene-analyzers-common-8.10.1.jar" />
+    <library name="lucene-backward-codecs-8.10.1.jar" />
+    <library name="lucene-core-8.10.1.jar" />
+    <library name="lucene-grouping-8.10.1.jar" />
+    <library name="lucene-highlighter-8.10.1.jar" />
+    <library name="lucene-join-8.10.1.jar" />
+    <library name="lucene-memory-8.10.1.jar" />
+    <library name="lucene-misc-8.10.1.jar" />
+    <library name="lucene-queries-8.10.1.jar" />
+    <library name="lucene-queryparser-8.10.1.jar" />
+    <library name="lucene-sandbox-8.10.1.jar" />
+    <library name="lucene-spatial-extras-8.10.1.jar" />
+    <library name="lucene-spatial3d-8.10.1.jar" />
+    <library name="lucene-suggest-8.10.1.jar" />
+    <library name="mapper-extras-client-1.3.8.jar" />
+    <library name="parent-join-client-1.3.8.jar" />
+    <library name="rank-eval-client-1.3.8.jar" />
+    <library name="snakeyaml-1.32.jar" />
+  </runtime>
+  <requires>
+    <import plugin="nutch-extensionpoints" />
+  </requires>
+  <extension id="org.apache.nutch.indexer.opensearch1x" name="OpenSearch Index 
Writer" point="org.apache.nutch.indexer.IndexWriter">
+    <implementation id="OpenSearch1xIndexWriter" 
class="org.apache.nutch.indexwriter.opensearch1x.OpenSearch1xIndexWriter" />
+  </extension>
+</plugin>
\ No newline at end of file
diff --git 
a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java
 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java
new file mode 100644
index 000000000..8ca5038dd
--- /dev/null
+++ 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.opensearch1x;
+
+public interface OpenSearch1xConstants {
+  String HOSTS = "host";
+  String PORT = "port";
+  String SCHEME = "scheme";
+
+  String USER = "username";
+  String PASSWORD = "password";
+  String USE_AUTH = "auth";
+
+  String TRUST_STORE_PATH = "trust.store.path";
+  String TRUST_STORE_PASSWORD = "trust.store.password";
+  String TRUST_STORE_TYPE = "trust.store.type";
+  String INDEX = "index";
+  String MAX_BULK_DOCS = "max.bulk.docs";
+  String MAX_BULK_LENGTH = "max.bulk.size";
+  String EXPONENTIAL_BACKOFF_MILLIS = "exponential.backoff.millis";
+  String EXPONENTIAL_BACKOFF_RETRIES = "exponential.backoff.retries";
+  String BULK_CLOSE_TIMEOUT = "bulk.close.timeout";
+  String OPTIONS = "options";
+}
diff --git 
a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java
 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java
new file mode 100644
index 000000000..c31fbf17d
--- /dev/null
+++ 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexwriter.opensearch1x;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.NutchField;
+import org.apache.nutch.util.StringUtil;
+import org.checkerframework.checker.units.qual.K;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestClientBuilder.HttpClientConfigCallback;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.common.unit.ByteSizeUnit;
+import org.opensearch.common.unit.ByteSizeValue;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.XContentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.time.format.DateTimeFormatter;
+import java.util.AbstractMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sends NutchDocuments to a configured OpenSearch index.
+ */
+public class OpenSearch1xIndexWriter implements IndexWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      MethodHandles.lookup().lookupClass());
+
+  private static final int DEFAULT_PORT = 9300;
+  private static final int DEFAULT_MAX_BULK_DOCS = 250;
+  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
+  private static final int DEFAULT_EXP_BACKOFF_MILLIS = 100;
+  private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10;
+  private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600;
+  private static final String DEFAULT_INDEX = "nutch";
+  private static final String DEFAULT_USER = "elastic";
+
+  private String[] hosts;
+  private int port;
+  private String scheme = HttpHost.DEFAULT_SCHEME_NAME;
+  private String user = null;
+  private String password = null;
+  private boolean auth;
+
+  private String trustStorePath;
+  private String trustStorePassword;
+  private String trustStoreType;
+  private int maxBulkDocs;
+  private int maxBulkLength;
+  private int expBackoffMillis;
+  private int expBackoffRetries;
+
+  private String defaultIndex;
+  private RestHighLevelClient client;
+  private BulkProcessor bulkProcessor;
+
+  private long bulkCloseTimeout;
+
+  private Configuration config;
+
+  @Override
+  public void open(Configuration conf, String name) throws IOException {
+    // Implementation not required
+  }
+
+  /**
+   * Initializes the internal variables from a given index writer
+   * configuration.
+   *
+   * @param parameters
+   *     Params from the index writer configuration.
+   * @throws IOException
+   *     Some exception thrown by writer.
+   */
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+
+    String hosts = parameters.get(OpenSearch1xConstants.HOSTS);
+
+    if (StringUtils.isBlank(hosts)) {
+      String message = "Missing elastic.host this should be set in 
index-writers.xml ";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+
+    bulkCloseTimeout = parameters.getLong(
+        OpenSearch1xConstants.BULK_CLOSE_TIMEOUT, DEFAULT_BULK_CLOSE_TIMEOUT);
+    defaultIndex = parameters.get(OpenSearch1xConstants.INDEX, DEFAULT_INDEX);
+
+    maxBulkDocs = parameters.getInt(OpenSearch1xConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = parameters.getInt(OpenSearch1xConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
+    expBackoffMillis = parameters.getInt(
+        OpenSearch1xConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        DEFAULT_EXP_BACKOFF_MILLIS);
+    expBackoffRetries = parameters.getInt(
+        OpenSearch1xConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        DEFAULT_EXP_BACKOFF_RETRIES);
+
+    client = makeClient(parameters);
+
+    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
+        maxBulkDocs, maxBulkLength);
+
+    bulkProcessor = BulkProcessor.builder(
+            (request, bulkListener) -> client.bulkAsync(request,
+                RequestOptions.DEFAULT, bulkListener), bulkProcessorListener())
+        .setBulkActions(maxBulkDocs)
+        .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
+        .setConcurrentRequests(1).setBackoffPolicy(
+            BackoffPolicy.exponentialBackoff(
+                TimeValue.timeValueMillis(expBackoffMillis), 
expBackoffRetries))
+        .build();
+  }
+
+  /**
+   * Generates a RestHighLevelClient with the hosts given
+   *
+   * @param parameters
+   *     implementation specific
+   *     {@link org.apache.nutch.indexer.IndexWriterParams}
+   * @return an initialized {@link org.opensearch.client.RestHighLevelClient}
+   * @throws IOException
+   *     if there is an error reading the
+   *     {@link org.apache.nutch.indexer.IndexWriterParams}
+   */
+  protected RestHighLevelClient makeClient(IndexWriterParams parameters)
+      throws IOException {
+    hosts = parameters.getStrings(OpenSearch1xConstants.HOSTS);
+    port = parameters.getInt(OpenSearch1xConstants.PORT, DEFAULT_PORT);
+    scheme = parameters.get(OpenSearch1xConstants.SCHEME,
+        HttpHost.DEFAULT_SCHEME_NAME);
+    auth = parameters.getBoolean(OpenSearch1xConstants.USE_AUTH, false);
+    user = parameters.get(OpenSearch1xConstants.USER, DEFAULT_USER);
+    password = parameters.get(OpenSearch1xConstants.PASSWORD, "");
+    trustStorePath = parameters.get(OpenSearch1xConstants.TRUST_STORE_PATH);
+    trustStorePassword = parameters.get(
+        OpenSearch1xConstants.TRUST_STORE_PASSWORD);
+    trustStoreType = parameters.get(OpenSearch1xConstants.TRUST_STORE_TYPE,
+        "JKS");
+
+    final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+        new UsernamePasswordCredentials(user, password));
+
+    RestHighLevelClient client = null;
+
+    if (hosts != null && port > 1) {
+      HttpHost[] hostsList = new HttpHost[hosts.length];
+      int i = 0;
+      for (String host : hosts) {
+        hostsList[i++] = new HttpHost(host, port, scheme);
+      }
+      RestClientBuilder restClientBuilder = RestClient.builder(hostsList);
+
+      if ("http".equals(scheme) && auth) {
+        restClientBuilder.setHttpClientConfigCallback(
+            new HttpClientConfigCallback() {
+              @Override
+              public HttpAsyncClientBuilder customizeHttpClient(
+                  HttpAsyncClientBuilder httpClientBuilder) {
+                return httpClientBuilder.setDefaultCredentialsProvider(
+                    credentialsProvider);
+              }
+            });
+      }
+
+      // In case of HTTPS, set up trust store
+      if ("https".equals(scheme)) {
+        try {
+          SSLContextBuilder sslBuilder = SSLContexts.custom();
+          KeyStore trustStore = KeyStore.getInstance("JKS");
+          try (InputStream is = Files.newInputStream(
+              Paths.get(trustStorePath))) {
+            trustStore.load(is, trustStorePassword.toCharArray());
+          }
+          sslBuilder.loadTrustMaterial(trustStore, null);
+          final SSLContext sslContext = sslBuilder.build();
+          restClientBuilder.setHttpClientConfigCallback(
+              new HttpClientConfigCallback() {
+                @Override
+                public HttpAsyncClientBuilder customizeHttpClient(
+                    HttpAsyncClientBuilder httpClientBuilder) {
+                  //do we still want this?!
+                  
//httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+                  if (auth) {
+                    httpClientBuilder.setDefaultCredentialsProvider(
+                        credentialsProvider);
+                  }
+                  return httpClientBuilder.setSSLContext(sslContext);
+                }
+              });
+        } catch (Exception e) {
+          LOG.error("Error setting up SSLContext because: " + e.getMessage(),
+              e);
+        }
+      }
+
+      client = new RestHighLevelClient(restClientBuilder);
+    } else {
+      throw new IOException(
+          "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the 
hosts");
+    }
+
+    return client;
+  }
+
+  /**
+   * Generates a default BulkProcessor.Listener
+   *
+   * @return {@link BulkProcessor.Listener}
+   */
+  protected BulkProcessor.Listener bulkProcessorListener() {
+    return new BulkProcessor.Listener() {
+      @Override
+      public void beforeBulk(long executionId, BulkRequest request) {
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request,
+          Throwable failure) {
+        LOG.error("Elasticsearch indexing failed:", failure);
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request,
+          BulkResponse response) {
+        if (response.hasFailures()) {
+          LOG.warn("Failures occurred during bulk request: {}",
+              response.buildFailureMessage());
+        }
+      }
+    };
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    String id = (String) doc.getFieldValue("id");
+    String type = doc.getDocumentMeta().get("type");
+    if (type == null)
+      type = "doc";
+
+    // Add each field of this doc to the index builder
+    XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+    for (final Map.Entry<String, NutchField> e : doc) {
+      final List<Object> values = e.getValue().getValues();
+
+      if (values.size() > 1) {
+        builder.array(e.getKey(), values);
+      } else {
+        Object value = values.get(0);
+        if (value instanceof java.util.Date) {
+          value = DateTimeFormatter.ISO_INSTANT.format(
+              ((java.util.Date) value).toInstant());
+        }
+        builder.field(e.getKey(), value);
+      }
+    }
+    builder.endObject();
+
+    IndexRequest request = new IndexRequest(defaultIndex).id(id)
+        .source(builder);
+    request.opType(DocWriteRequest.OpType.INDEX);
+
+    bulkProcessor.add(request);
+  }
+
+  @Override
+  public void delete(String key) throws IOException {
+    DeleteRequest request = new DeleteRequest(defaultIndex, key);
+    bulkProcessor.add(request);
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    write(doc);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    bulkProcessor.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Close BulkProcessor (automatically flushes)
+    try {
+      bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})",
+          e.getMessage());
+    }
+
+    client.close();
+  }
+
+  /**
+   * Returns {@link Map} with the specific parameters the IndexWriter instance
+   * can take.
+   *
+   * @return The values of each row. It must have the form
+   *     &#60;KEY,&#60;DESCRIPTION,VALUE&#62;&#62;.
+   */
+  @Override
+  public Map<String, Map.Entry<String, Object>> describe() {
+    Map<String, Map.Entry<String, Object>> properties = new LinkedHashMap<>();
+
+    properties.put(OpenSearch1xConstants.HOSTS,
+        new AbstractMap.SimpleEntry<>("Comma-separated list of hostnames",
+            this.hosts == null ? "" : String.join(",", hosts)));
+    properties.put(OpenSearch1xConstants.PORT,
+        new AbstractMap.SimpleEntry<>("The port to connect to elastic server.",
+            this.port));
+    properties.put(OpenSearch1xConstants.SCHEME, new AbstractMap.SimpleEntry<>(
+        "The scheme (http or https) to connect to elastic server.",
+        this.scheme));
+    properties.put(OpenSearch1xConstants.INDEX,
+        new AbstractMap.SimpleEntry<>("Default index to send documents to.",
+            this.defaultIndex));
+    properties.put(OpenSearch1xConstants.USER,
+        new AbstractMap.SimpleEntry<>("Username for auth credentials",
+            this.user));
+    properties.put(OpenSearch1xConstants.PASSWORD,
+        new AbstractMap.SimpleEntry<>("Password for auth credentials",
+            StringUtil.mask(this.password)));
+    properties.put(OpenSearch1xConstants.TRUST_STORE_PATH,
+        new AbstractMap.SimpleEntry<>("Trust store path", 
this.trustStorePath));
+    properties.put(OpenSearch1xConstants.TRUST_STORE_PASSWORD,
+        new AbstractMap.SimpleEntry<>("Password for trust store",
+            StringUtil.mask(this.trustStorePassword)));
+    properties.put(OpenSearch1xConstants.TRUST_STORE_TYPE,
+        new AbstractMap.SimpleEntry<>("Trust store type (default=JKS)",
+            this.trustStoreType));
+
+    properties.put(OpenSearch1xConstants.MAX_BULK_DOCS,
+        new AbstractMap.SimpleEntry<>(
+            "Maximum size of the bulk in number of documents.",
+            this.maxBulkDocs));
+    properties.put(OpenSearch1xConstants.MAX_BULK_LENGTH,
+        new AbstractMap.SimpleEntry<>("Maximum size of the bulk in bytes.",
+            this.maxBulkLength));
+    properties.put(OpenSearch1xConstants.EXPONENTIAL_BACKOFF_MILLIS,
+        new AbstractMap.SimpleEntry<>(
+            "Initial delay for the BulkProcessor exponential backoff policy.",
+            this.expBackoffMillis));
+    properties.put(OpenSearch1xConstants.EXPONENTIAL_BACKOFF_RETRIES,
+        new AbstractMap.SimpleEntry<>(
+            "Number of times the BulkProcessor exponential backoff policy 
should retry bulk operations.",
+            this.expBackoffRetries));
+    properties.put(OpenSearch1xConstants.BULK_CLOSE_TIMEOUT,
+        new AbstractMap.SimpleEntry<>(
+            "Number of seconds allowed for the BulkProcessor to complete its 
last operation.",
+            this.bulkCloseTimeout));
+
+    return properties;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
+}
diff --git 
a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/package-info.java
 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/package-info.java
new file mode 100644
index 000000000..e5d18876b
--- /dev/null
+++ 
b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Index writer plugin for <a href="https://opensearch.org";>OpenSearch</a>.
+ */
+package org.apache.nutch.indexwriter.opensearch1x;
+


Reply via email to