This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-stormcrawler.git
commit 1b2b121d6b149eca4b7997d04e976252e75a3188 Author: Julien Nioche <[email protected]> AuthorDate: Fri Oct 25 12:19:42 2024 +0100 Remove references to ES in OpenSearch module Signed-off-by: Julien Nioche <[email protected]> --- .../archetype-resources/opensearch-conf.yaml | 8 ++++---- external/opensearch/opensearch-conf.yaml | 8 ++++---- .../opensearch/OpenSearchConnection.java | 2 +- .../opensearch/filtering/JSONURLFilterWrapper.java | 22 +++++++++++----------- .../opensearch/metrics/MetricsConsumer.java | 6 +++--- .../opensearch/persistence/AggregationSpout.java | 4 ++-- .../opensearch/persistence/HybridSpout.java | 2 +- .../opensearch/persistence/StatusUpdaterBolt.java | 12 ++++++------ .../opensearch/bolt/IndexerBoltTest.java | 2 +- 9 files changed, 33 insertions(+), 33 deletions(-) diff --git a/external/opensearch/archetype/src/main/resources/archetype-resources/opensearch-conf.yaml b/external/opensearch/archetype/src/main/resources/archetype-resources/opensearch-conf.yaml index 0e3998f1..4181f131 100644 --- a/external/opensearch/archetype/src/main/resources/archetype-resources/opensearch-conf.yaml +++ b/external/opensearch/archetype/src/main/resources/archetype-resources/opensearch-conf.yaml @@ -64,15 +64,15 @@ config: # time in secs for which the URLs will be considered for fetching after a ack of fail spout.ttl.purgatory: 30 - # Min time (in msecs) to allow between 2 successive queries to ES + # Min time (in msecs) to allow between 2 successive queries to OpenSearch spout.min.delay.queries: 2000 - # Max time (in msecs) to allow between 2 successive queries to ES + # Max time (in msecs) to allow between 2 successive queries to OpenSearch spout.max.delay.queries: 20000 # Delay since previous query date (in secs) after which the nextFetchDate value will be reset to the current time - # Setting this to -1 or a large value means that the ES will cache the results but also that less and less results - # might be returned. + # Setting this to -1 or a large value means that the OpenSearch will cache the results but also that fewer and fewer + # results might be returned. spout.reset.fetchdate.after: 120 opensearch.status.max.buckets: 50 diff --git a/external/opensearch/opensearch-conf.yaml b/external/opensearch/opensearch-conf.yaml index 13bd5697..8ec0341c 100644 --- a/external/opensearch/opensearch-conf.yaml +++ b/external/opensearch/opensearch-conf.yaml @@ -67,15 +67,15 @@ config: # time in secs for which the URLs will be considered for fetching after a ack of fail spout.ttl.purgatory: 30 - # Min time (in msecs) to allow between 2 successive queries to ES + # Min time (in msecs) to allow between 2 successive queries to OpenSearch spout.min.delay.queries: 2000 - # Max time (in msecs) to allow between 2 successive queries to ES + # Max time (in msecs) to allow between 2 successive queries to OpenSearch spout.max.delay.queries: 20000 # Delay since previous query date (in secs) after which the nextFetchDate value will be reset to the current time - # Setting this to -1 or a large value means that the ES will cache the results but also that less and less results - # might be returned. + # Setting this to -1 or a large value means that OpenSearch will cache the results but also that fewer and fewer + # results might be returned. spout.reset.fetchdate.after: 120 opensearch.status.max.buckets: 50 diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java index 8de2891e..a5c5da46 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java @@ -188,7 +188,7 @@ public final class OpenSearchConnection { // for data ); - // TODO check if this has gone somewhere else in ES 7 + // TODO check if this has gone somewhere else // int maxRetryTimeout = ConfUtils.getInt(stormConf, Constants.PARAMPREFIX + // boltType + // ".max.retry.timeout", diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java index 9d653779..17977b9b 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java @@ -36,9 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Wraps a URLFilter whose resources are in a JSON file that can be stored in ES. The benefit of + * Wraps a URLFilter whose resources are in a JSON file that can be stored in OpenSearch. The benefit of * doing this is that the resources can be refreshed automatically and modified without having to - * recompile the jar and restart the topology. The connection to ES is done via the config and uses + * recompile the jar and restart the topology. The connection to OpenSearch is done via the config and uses * a new bolt type 'config'. * * <p>The configuration of the delegate is done in the urlfilters.json as usual. @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; * } * </pre> * - * The resource file can be pushed to ES with + * The resource file can be pushed to OpenSearch with * * <pre> * curl -XPUT 'localhost:9200/config/config/fast.urlfilter.json?pretty' -H 'Content-Type: application/json' -d @fast.urlfilter.json @@ -129,22 +129,22 @@ public class JSONURLFilterWrapper extends URLFilter { new Timer() .schedule( new TimerTask() { - private RestHighLevelClient esClient; + private RestHighLevelClient osClient; public void run() { - if (esClient == null) { + if (osClient == null) { try { - esClient = + osClient = OpenSearchConnection.getClient(stormConf, "config"); } catch (Exception e) { - LOG.error("Exception while creating ES connection", e); + LOG.error("Exception while creating OpenSearch connection", e); } } - if (esClient != null) { - LOG.info("Reloading json resources from ES"); + if (osClient != null) { + LOG.info("Reloading json resources from OpenSearch"); try { GetResponse response = - esClient.get( + osClient.get( new GetRequest( "config", resource.getResourceFile()), @@ -153,7 +153,7 @@ public class JSONURLFilterWrapper extends URLFilter { new ByteArrayInputStream( response.getSourceAsBytes())); } catch (Exception e) { - LOG.error("Can't load config from ES", e); + LOG.error("Can't load config from OpenSearch", e); } } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsConsumer.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsConsumer.java index 3a336783..f806487d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsConsumer.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsConsumer.java @@ -38,12 +38,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Sends metrics to an Elasticsearch index. The ES details are set in the configuration; an optional + * Sends metrics to an OpenSearch index. The OpenSearch details are set in the configuration; an optional * argument sets a date format to append to the index name. * * <pre> * topology.metrics.consumer.register: - * - class: "org.apache.stormcrawler.elasticsearch.metrics.MetricsConsumer" + * - class: "org.apache.stormcrawler.opensearch.metrics.MetricsConsumer" * parallelism.hint: 1 * argument: "yyyy-MM-dd" * </pre> @@ -156,7 +156,7 @@ public class MetricsConsumer implements IMetricsConsumer { IndexRequest indexRequest = new IndexRequest(getIndexName(timestamp)).source(builder); connection.addToProcessor(indexRequest); } catch (Exception e) { - LOG.error("problem when building request for ES", e); + LOG.error("problem when building request for OpenSearch", e); } } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java index 8d786260..2df2e116 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java @@ -59,9 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Spout which pulls URL from an ES index. Use a single instance unless you use 'es.status.routing' + * Spout which pulls URL from an OpenSearch index. Use a single instance unless you use 'es.status.routing' * with the StatusUpdaterBolt, in which case you need to have exactly the same number of spout - * instances as ES shards. Guarantees a good mix of URLs by aggregating them by an arbitrary field + * instances as OpenSearch shards. Guarantees a good mix of URLs by aggregating them by an arbitrary field * e.g. key. */ public class AggregationSpout extends AbstractSpout implements ActionListener<SearchResponse> { diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/HybridSpout.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/HybridSpout.java index f901bfbc..08759fa9 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/HybridSpout.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/HybridSpout.java @@ -86,7 +86,7 @@ public class HybridSpout extends AggregationSpout implements EmptyQueueListener } // reloading the aggregs - searching now - // would just overload ES and yield + // would just overload OpenSearch and yield // mainly duplicates if (isInQuery.get()) { LOG.trace("{} isInquery true", logIdprefix, queueName); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index 1fdfd491..1f8ea55a 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -65,7 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Simple bolt which stores the status of URLs into ElasticSearch. Takes the tuples coming from the + * Simple bolt which stores the status of URLs into OpenSearch. Takes the tuples coming from the * 'status' stream. To be used in combination with a Spout to read from the index. */ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt @@ -156,7 +156,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt routingFieldNameInMetadata = true; fieldNameForRoutingKey = fieldNameForRoutingKey.substring("metadata.".length()); } - // periods are not allowed in ES2 - replace with %2E + // periods are not allowed in - replace with %2E fieldNameForRoutingKey = fieldNameForRoutingKey.replaceAll("\\.", "%2E"); } @@ -215,10 +215,10 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt boolean isAlreadySentAndDiscovered; // need to synchronize: otherwise it might get added to the cache - // without having been sent to ES + // without having been sent to OpenSearch waitAckLock.lock(); try { - // check that the same URL is not being sent to ES + // check that the same URL is not being sent to OpenSearch final var alreadySent = waitAck.getIfPresent(documentID); isAlreadySentAndDiscovered = status.equals(Status.DISCOVERED) && alreadySent != null; } finally { @@ -246,7 +246,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt builder.startObject("metadata"); for (String mdKey : metadata.keySet()) { String[] values = metadata.getValues(mdKey); - // periods are not allowed in ES2 - replace with %2E + // periods are not allowed - replace with %2E mdKey = mdKey.replaceAll("\\.", "%2E"); builder.array(mdKey, values); } @@ -295,7 +295,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt waitAckLock.unlock(); } - LOG.debug("Sending to ES buffer {} with ID {}", url, documentID); + LOG.debug("Sending to OpenSearch buffer {} with ID {}", url, documentID); connection.addToProcessor(request); } diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/IndexerBoltTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/IndexerBoltTest.java index e9fb27cf..60afe2f2 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/IndexerBoltTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/IndexerBoltTest.java @@ -67,7 +67,7 @@ class IndexerBoltTest extends AbstractOpenSearchTest { @BeforeEach void setupIndexerBolt() { bolt = new IndexerBolt("content"); - // give the indexer the port for connecting to ES + // give the indexer the port for connecting to OpenSearch final String host = opensearchContainer.getHost(); final Integer port = opensearchContainer.getFirstMappedPort(); final Map<String, Object> conf = new HashMap<>();
