This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch 1198 in repository https://gitbox.apache.org/repos/asf/incubator-stormcrawler.git
commit 87148d2535682c53f81a4482699e9d8e894c5f64 Author: Richard Zowalla <[email protected]> AuthorDate: Fri Apr 26 20:49:18 2024 +0200 Implements a draft impl (to be tested with k8s) --- .../opensearch/OpenSearchConnection.java | 8 ++- .../stormcrawler/opensearch/SuffixNodeSniffer.java | 62 +++++++++++++++++++--- 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java index 88203844..67b78e32 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java @@ -43,6 +43,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.sniff.OpenSearchNodesSniffer; import org.opensearch.client.sniff.Sniffer; import org.opensearch.common.unit.TimeValue; import org.slf4j.Logger; @@ -261,7 +262,12 @@ public final class OpenSearchConnection { boolean sniff = ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", "sniff", true); Sniffer sniffer = null; if (sniff) { - sniffer = Sniffer.builder(client.getLowLevelClient()).build(); + final RestClient c = client.getLowLevelClient(); + sniffer = + Sniffer.builder(c) + .setNodesSniffer( + new SuffixNodeSniffer(new OpenSearchNodesSniffer(c), stormConf)) + .build(); } return new OpenSearchConnection(client, bulkProcessor, sniffer); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java index 272eebf1..326c39b2 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/SuffixNodeSniffer.java @@ -1,9 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.stormcrawler.opensearch; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.http.HttpHost; import org.apache.stormcrawler.util.ConfUtils; +import org.apache.tika.utils.StringUtils; import org.opensearch.client.Node; import org.opensearch.client.sniff.NodesSniffer; import org.opensearch.client.sniff.OpenSearchNodesSniffer; @@ -11,21 +32,48 @@ import org.opensearch.client.sniff.OpenSearchNodesSniffer; public class SuffixNodeSniffer implements NodesSniffer { private final OpenSearchNodesSniffer delegate; - private final Map<String, Object> stormConf; - - private final boolean sniff; + private final boolean appendSuffix; + private final String dnsSuffix; public SuffixNodeSniffer(OpenSearchNodesSniffer delegate, Map<String, Object> stormConf) { this.delegate = delegate; - this.stormConf = stormConf; - this.sniff = - ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true); + this.dnsSuffix = + ConfUtils.getString( + stormConf, Constants.PARAMPREFIX, "", "sniff.dns.host.suffix", ""); + this.appendSuffix = + ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, "", "sniff", true) + && !StringUtils.isBlank(dnsSuffix); } @Override public List<Node> sniff() throws IOException { final List<Node> nodes = delegate.sniff(); + if (appendSuffix) { + final List<Node> updated = new ArrayList<>(); + for (Node n : nodes) { + updated.add(copy(n, dnsSuffix)); + } + return updated; + } else { + return nodes; + } + } + + private Node copy(Node n, String dnsSuffix) { + final Set<HttpHost> boundHosts = new HashSet<>(); + for (HttpHost boundHost : n.getBoundHosts()) { + boundHosts.add(appendSuffix(boundHost, dnsSuffix)); + } + return new Node( + appendSuffix(n.getHost(), dnsSuffix), + boundHosts, + n.getName(), + n.getVersion(), + n.getRoles(), + n.getAttributes()); + } - return null; + private HttpHost appendSuffix(HttpHost host, String dnsSuffix) { + return new HttpHost(host.getHostName() + dnsSuffix, host.getPort(), host.getSchemeName()); } }
