This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch 1198
in repository https://gitbox.apache.org/repos/asf/incubator-stormcrawler.git

commit a483e1c2db9775992f19c27d07a5c320d9d89d0e
Author: Richard Zowalla <[email protected]>
AuthorDate: Fri Apr 26 20:28:55 2024 +0200

    Fix #1198 - Allow addition of custom DNS suffix to OpenSearch nodes found 
via sniffing
---
 external/opensearch/opensearch-conf.yaml           |  2 +
 .../opensearch/OpenSearchConnection.java           | 12 +++-
 .../SuffixAwareOpenSearchNodesSniffer.java         | 80 ++++++++++++++++++++++
 3 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/external/opensearch/opensearch-conf.yaml 
b/external/opensearch/opensearch-conf.yaml
index f37b8d5a..4092c99c 100644
--- a/external/opensearch/opensearch-conf.yaml
+++ b/external/opensearch/opensearch-conf.yaml
@@ -7,6 +7,8 @@ config:
   opensearch.addresses: "http://localhost:9200";
   #opensearch.user: "USERNAME"
   #opensearch.password: "PASSWORD"
+  opensearch.sniff: true
+  #opensearch.sniff.dns.host.suffix: some-domain-specific-suffix, which is 
appended to sniffing results.
   opensearch.concurrentRequests: 2
 
   # Indexer bolt
diff --git 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
index d3eb87b7..436f7fc0 100644
--- 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
+++ 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
@@ -43,6 +43,7 @@ import org.opensearch.client.RequestOptions;
 import org.opensearch.client.RestClient;
 import org.opensearch.client.RestClientBuilder;
 import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.sniff.OpenSearchNodesSniffer;
 import org.opensearch.client.sniff.Sniffer;
 import org.opensearch.common.unit.TimeValue;
 import org.slf4j.Logger;
@@ -258,11 +259,16 @@ public final class OpenSearchConnection {
                         .setConcurrentRequests(concurrentRequests)
                         .build();
 
-        boolean sniff =
-                ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, 
dottedType, "sniff", true);
+        boolean sniff = ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, 
"", "sniff", true);
         Sniffer sniffer = null;
         if (sniff) {
-            sniffer = Sniffer.builder(client.getLowLevelClient()).build();
+            final RestClient c = client.getLowLevelClient();
+            sniffer =
+                    Sniffer.builder(c)
+                            .setNodesSniffer(
+                                    new SuffixAwareOpenSearchNodesSniffer(
+                                            new OpenSearchNodesSniffer(c), 
stormConf))
+                            .build();
         }
 
         return new OpenSearchConnection(client, bulkProcessor, sniffer);
diff --git 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java
 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java
new file mode 100644
index 00000000..85811ff6
--- /dev/null
+++ 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stormcrawler.opensearch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.http.HttpHost;
+import org.apache.stormcrawler.util.ConfUtils;
+import org.apache.tika.utils.StringUtils;
+import org.opensearch.client.Node;
+import org.opensearch.client.sniff.NodesSniffer;
+import org.opensearch.client.sniff.OpenSearchNodesSniffer;
+
+public class SuffixAwareOpenSearchNodesSniffer implements NodesSniffer {
+
+    private final OpenSearchNodesSniffer delegate;
+    private final boolean appendSuffix;
+    private final String dnsSuffix;
+
+    public SuffixAwareOpenSearchNodesSniffer(
+            OpenSearchNodesSniffer delegate, Map<String, Object> stormConf) {
+        this.delegate = delegate;
+        this.dnsSuffix =
+                ConfUtils.getString(
+                        stormConf, Constants.PARAMPREFIX, "", 
"sniff.dns.host.suffix", "");
+        this.appendSuffix =
+                ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", 
"sniff", true)
+                        && !StringUtils.isBlank(dnsSuffix);
+    }
+
+    @Override
+    public List<Node> sniff() throws IOException {
+        final List<Node> nodes = delegate.sniff();
+        if (appendSuffix) {
+            final List<Node> updated = new ArrayList<>();
+            for (Node n : nodes) {
+                updated.add(copy(n, dnsSuffix));
+            }
+            return updated;
+        } else {
+            return nodes;
+        }
+    }
+
+    private Node copy(Node n, String dnsSuffix) {
+        final Set<HttpHost> boundHosts = new HashSet<>();
+        for (HttpHost boundHost : n.getBoundHosts()) {
+            boundHosts.add(appendSuffix(boundHost, dnsSuffix));
+        }
+        return new Node(
+                appendSuffix(n.getHost(), dnsSuffix),
+                boundHosts,
+                n.getName(),
+                n.getVersion(),
+                n.getRoles(),
+                n.getAttributes());
+    }
+
+    private HttpHost appendSuffix(HttpHost host, String dnsSuffix) {
+        return new HttpHost(host.getHostName() + dnsSuffix, host.getPort(), 
host.getSchemeName());
+    }
+}

Reply via email to