This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5db3258eb04b550bc3f992daf7f37d01a465f43b Author: Lari Hotari <[email protected]> AuthorDate: Wed Jun 28 21:10:05 2023 +0300 [improve][test] Disable disk usage threshold & geoip download and enable logging for Elastic Testcontainers (#20671) (cherry picked from commit e97fe5b8ac376ecead0097b184f0f1436fce88e4) --- .../io/elasticsearch/ElasticSearchTestBase.java | 32 +++++++++++------ .../io/sinks/ElasticSearch7SinkTester.java | 4 +-- .../io/sinks/ElasticSearch8SinkTester.java | 5 ++- .../io/sinks/ElasticSearchSinkTester.java | 40 +++++++++++++++++----- .../integration/io/sinks/OpenSearchSinkTester.java | 13 +++---- .../integration/topologies/PulsarCluster.java | 2 ++ 6 files changed, 66 insertions(+), 30 deletions(-) diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index 4c6fd020fa3..0f5a42051c7 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -18,10 +18,6 @@ */ package org.apache.pulsar.io.elasticsearch; -import java.io.IOException; -import java.util.Map; -import java.util.Optional; - import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest; import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse; @@ -29,6 +25,10 @@ import co.elastic.clients.elasticsearch.security.GetTokenRequest; import co.elastic.clients.elasticsearch.security.GetTokenResponse; import co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient; import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.opensearch.client.Request; @@ -36,10 +36,11 @@ import org.opensearch.client.Response; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; +@Slf4j public abstract class ElasticSearchTestBase { public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3"); public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7")) .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7"); @@ -54,17 +55,28 @@ public abstract class ElasticSearchTestBase { } protected ElasticsearchContainer createElasticsearchContainer() { + ElasticsearchContainer elasticsearchContainer; if (elasticImageName.equals(OPENSEARCH)) { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); - return new ElasticsearchContainer(dockerImageName) + elasticsearchContainer = new ElasticsearchContainer(dockerImageName) .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("bootstrap.memory_lock", "true") .withEnv("plugins.security.disabled", "true"); + } else { + elasticsearchContainer = new ElasticsearchContainer(elasticImageName) + .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") + .withEnv("xpack.security.enabled", "false") + .withEnv("xpack.security.http.ssl.enabled", "false"); + } + configureElasticContainer(elasticsearchContainer); + return elasticsearchContainer; + } + + protected void configureElasticContainer(ElasticsearchContainer elasticContainer) { + if (getCompatibilityMode() != ElasticSearchConfig.CompatibilityMode.OPENSEARCH) { + elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false"); } - return new ElasticsearchContainer(elasticImageName) - .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") - .withEnv("xpack.security.enabled", "false") - .withEnv("xpack.security.http.ssl.enabled", "false"); + elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); } protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java index 65b38c677bf..d99fcad2527 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java @@ -19,7 +19,6 @@ package org.apache.pulsar.tests.integration.io.sinks; import java.util.Optional; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch7SinkTester extends ElasticSearchSinkTester { @@ -32,8 +31,9 @@ public class ElasticSearch7SinkTester extends ElasticSearchSinkTester { super(schemaEnable); } + @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { return new ElasticsearchContainer(ELASTICSEARCH_7) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m"); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java index bb52c4ff03f..8e7617a82a5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java @@ -19,13 +19,12 @@ package org.apache.pulsar.tests.integration.io.sinks; import java.util.Optional; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch8SinkTester extends ElasticSearchSinkTester { public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3"); public ElasticSearch8SinkTester(boolean schemaEnable) { @@ -33,7 +32,7 @@ public class ElasticSearch8SinkTester extends ElasticSearchSinkTester { } @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { return new ElasticsearchContainer(ELASTICSEARCH_8) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("xpack.security.enabled", "false") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java index 546dd1b9113..0784055d290 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java @@ -19,15 +19,6 @@ package org.apache.pulsar.tests.integration.io.sinks; import static org.testng.Assert.assertTrue; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.SearchRequest; import co.elastic.clients.elasticsearch.core.SearchResponse; @@ -35,6 +26,13 @@ import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.Data; @@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -100,6 +99,29 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo } } + @Override + protected final ElasticsearchContainer createSinkService(PulsarCluster cluster) { + ElasticsearchContainer elasticContainer = createElasticContainer(); + configureElasticContainer(elasticContainer); + return elasticContainer; + } + + protected void configureElasticContainer(ElasticsearchContainer elasticContainer) { + if (!isOpenSearch()) { + elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false"); + } + + // allow disk to fill up beyond default 90% threshold + elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false"); + + elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String())); + } + + protected boolean isOpenSearch() { + return false; + } + + protected abstract ElasticsearchContainer createElasticContainer(); @Override public void prepareSink() throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index 1e10cc4189c..75f0fdac6f9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -18,9 +18,10 @@ */ package org.apache.pulsar.tests.integration.io.sinks; +import static org.testng.Assert.assertTrue; +import java.util.Map; import java.util.Optional; import org.apache.http.HttpHost; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -31,10 +32,6 @@ import org.opensearch.client.RestHighLevelClient; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; -import java.util.Map; - -import static org.testng.Assert.assertTrue; - public class OpenSearchSinkTester extends ElasticSearchSinkTester { public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) @@ -48,7 +45,7 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester { } @Override - protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { + protected ElasticsearchContainer createElasticContainer() { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH) .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); return new ElasticsearchContainer(dockerImageName) @@ -57,6 +54,10 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester { .withEnv("plugins.security.disabled", "true"); } + protected boolean isOpenSearch() { + return true; + } + @Override public void prepareSink() throws Exception { RestClientBuilder builder = RestClient.builder( diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index bd11b7d3873..ca5fe4b3852 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -167,7 +167,9 @@ public class PulsarCluster { .withEnv("journalSyncData", "false") .withEnv("journalMaxGroupWaitMSec", "0") .withEnv("clusterName", clusterName) + .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95") .withEnv("diskUsageThreshold", "0.99") + .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97") .withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize)); if (spec.bookkeeperEnvs != null) { bookieContainer.withEnv(spec.bookkeeperEnvs);
