This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch 1198 in repository https://gitbox.apache.org/repos/asf/incubator-stormcrawler.git
commit a483e1c2db9775992f19c27d07a5c320d9d89d0e Author: Richard Zowalla <[email protected]> AuthorDate: Fri Apr 26 20:28:55 2024 +0200 Fix #1198 - Allow addition of custom DNS suffix to OpenSearch nodes found via sniffing --- external/opensearch/opensearch-conf.yaml | 2 + .../opensearch/OpenSearchConnection.java | 12 +++- .../SuffixAwareOpenSearchNodesSniffer.java | 80 ++++++++++++++++++++++ 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/external/opensearch/opensearch-conf.yaml b/external/opensearch/opensearch-conf.yaml index f37b8d5a..4092c99c 100644 --- a/external/opensearch/opensearch-conf.yaml +++ b/external/opensearch/opensearch-conf.yaml @@ -7,6 +7,8 @@ config: opensearch.addresses: "http://localhost:9200" #opensearch.user: "USERNAME" #opensearch.password: "PASSWORD" + opensearch.sniff: true + #opensearch.sniff.dns.host.suffix: some-domain-specific-suffix, which is appended to sniffing results. opensearch.concurrentRequests: 2 # Indexer bolt diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java index d3eb87b7..436f7fc0 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java @@ -43,6 +43,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.sniff.OpenSearchNodesSniffer; import org.opensearch.client.sniff.Sniffer; import org.opensearch.common.unit.TimeValue; import org.slf4j.Logger; @@ -258,11 +259,16 @@ public final class OpenSearchConnection { .setConcurrentRequests(concurrentRequests) .build(); - boolean sniff = - ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true); + boolean sniff = ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", "sniff", true); Sniffer sniffer = null; if (sniff) { - sniffer = Sniffer.builder(client.getLowLevelClient()).build(); + final RestClient c = client.getLowLevelClient(); + sniffer = + Sniffer.builder(c) + .setNodesSniffer( + new SuffixAwareOpenSearchNodesSniffer( + new OpenSearchNodesSniffer(c), stormConf)) + .build(); } return new OpenSearchConnection(client, bulkProcessor, sniffer); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java new file mode 100644 index 00000000..85811ff6 --- /dev/null +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixAwareOpenSearchNodesSniffer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stormcrawler.opensearch; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.http.HttpHost; +import org.apache.stormcrawler.util.ConfUtils; +import org.apache.tika.utils.StringUtils; +import org.opensearch.client.Node; +import org.opensearch.client.sniff.NodesSniffer; +import org.opensearch.client.sniff.OpenSearchNodesSniffer; + +public class SuffixAwareOpenSearchNodesSniffer implements NodesSniffer { + + private final OpenSearchNodesSniffer delegate; + private final boolean appendSuffix; + private final String dnsSuffix; + + public SuffixAwareOpenSearchNodesSniffer( + OpenSearchNodesSniffer delegate, Map<String, Object> stormConf) { + this.delegate = delegate; + this.dnsSuffix = + ConfUtils.getString( + stormConf, Constants.PARAMPREFIX, "", "sniff.dns.host.suffix", ""); + this.appendSuffix = + ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", "sniff", true) + && !StringUtils.isBlank(dnsSuffix); + } + + @Override + public List<Node> sniff() throws IOException { + final List<Node> nodes = delegate.sniff(); + if (appendSuffix) { + final List<Node> updated = new ArrayList<>(); + for (Node n : nodes) { + updated.add(copy(n, dnsSuffix)); + } + return updated; + } else { + return nodes; + } + } + + private Node copy(Node n, String dnsSuffix) { + final Set<HttpHost> boundHosts = new HashSet<>(); + for (HttpHost boundHost : n.getBoundHosts()) { + boundHosts.add(appendSuffix(boundHost, dnsSuffix)); + } + return new Node( + appendSuffix(n.getHost(), dnsSuffix), + boundHosts, + n.getName(), + n.getVersion(), + n.getRoles(), + n.getAttributes()); + } + + private HttpHost appendSuffix(HttpHost host, String dnsSuffix) { + return new HttpHost(host.getHostName() + dnsSuffix, host.getPort(), host.getSchemeName()); + } +}
