This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch 1198
in repository https://gitbox.apache.org/repos/asf/incubator-stormcrawler.git

commit 87148d2535682c53f81a4482699e9d8e894c5f64
Author: Richard Zowalla <[email protected]>
AuthorDate: Fri Apr 26 20:49:18 2024 +0200

    Implements a draft impl (to be tested with k8s)
---
 .../opensearch/OpenSearchConnection.java           |  8 ++-
 .../stormcrawler/opensearch/SuffixNodeSniffer.java | 62 +++++++++++++++++++---
 2 files changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
index 88203844..67b78e32 100644
--- 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
+++ 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java
@@ -43,6 +43,7 @@ import org.opensearch.client.RequestOptions;
 import org.opensearch.client.RestClient;
 import org.opensearch.client.RestClientBuilder;
 import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.sniff.OpenSearchNodesSniffer;
 import org.opensearch.client.sniff.Sniffer;
 import org.opensearch.common.unit.TimeValue;
 import org.slf4j.Logger;
@@ -261,7 +262,12 @@ public final class OpenSearchConnection {
         boolean sniff = ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, 
"", "sniff", true);
         Sniffer sniffer = null;
         if (sniff) {
-            sniffer = Sniffer.builder(client.getLowLevelClient()).build();
+            final RestClient c = client.getLowLevelClient();
+            sniffer =
+                    Sniffer.builder(c)
+                            .setNodesSniffer(
+                                    new SuffixNodeSniffer(new 
OpenSearchNodesSniffer(c), stormConf))
+                            .build();
         }
 
         return new OpenSearchConnection(client, bulkProcessor, sniffer);
diff --git 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java
 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java
index 272eebf1..326c39b2 100644
--- 
a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java
+++ 
b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java
@@ -1,9 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.stormcrawler.opensearch;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import org.apache.http.HttpHost;
 import org.apache.stormcrawler.util.ConfUtils;
+import org.apache.tika.utils.StringUtils;
 import org.opensearch.client.Node;
 import org.opensearch.client.sniff.NodesSniffer;
 import org.opensearch.client.sniff.OpenSearchNodesSniffer;
@@ -11,21 +32,48 @@ import org.opensearch.client.sniff.OpenSearchNodesSniffer;
 public class SuffixNodeSniffer implements NodesSniffer {
 
     private final OpenSearchNodesSniffer delegate;
-    private final Map<String, Object> stormConf;
-
-    private final boolean sniff;
+    private final boolean appendSuffix;
+    private final String dnsSuffix;
 
     public SuffixNodeSniffer(OpenSearchNodesSniffer delegate, Map<String, 
Object> stormConf) {
         this.delegate = delegate;
-        this.stormConf = stormConf;
-        this.sniff =
-                ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, 
dottedType, "sniff", true);
+        this.dnsSuffix =
+                ConfUtils.getString(
+                        stormConf, Constants.PARAMPREFIX, "", 
"sniff.dns.host.suffix", "");
+        this.appendSuffix =
+                ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", 
"sniff", true)
+                        && !StringUtils.isBlank(dnsSuffix);
     }
 
     @Override
     public List<Node> sniff() throws IOException {
         final List<Node> nodes = delegate.sniff();
+        if (appendSuffix) {
+            final List<Node> updated = new ArrayList<>();
+            for (Node n : nodes) {
+                updated.add(copy(n, dnsSuffix));
+            }
+            return updated;
+        } else {
+            return nodes;
+        }
+    }
+
+    private Node copy(Node n, String dnsSuffix) {
+        final Set<HttpHost> boundHosts = new HashSet<>();
+        for (HttpHost boundHost : n.getBoundHosts()) {
+            boundHosts.add(appendSuffix(boundHost, dnsSuffix));
+        }
+        return new Node(
+                appendSuffix(n.getHost(), dnsSuffix),
+                boundHosts,
+                n.getName(),
+                n.getVersion(),
+                n.getRoles(),
+                n.getAttributes());
+    }
 
-        return null;
+    private HttpHost appendSuffix(HttpHost host, String dnsSuffix) {
+        return new HttpHost(host.getHostName() + dnsSuffix, host.getPort(), 
host.getSchemeName());
     }
 }

Reply via email to