This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 64e60b56278 [improve][test] Disable disk usage threshold & geoip
download and enable logging for Elastic Testcontainers (#20671)
64e60b56278 is described below
commit 64e60b562780a2c137439166420313b4525fde87
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 28 21:10:05 2023 +0300
[improve][test] Disable disk usage threshold & geoip download and enable
logging for Elastic Testcontainers (#20671)
(cherry picked from commit e97fe5b8ac376ecead0097b184f0f1436fce88e4)
# Conflicts:
#
tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
---
.../io/elasticsearch/ElasticSearchTestBase.java | 32 +++++++++++------
.../io/sinks/ElasticSearch7SinkTester.java | 4 +--
.../io/sinks/ElasticSearch8SinkTester.java | 5 ++-
.../io/sinks/ElasticSearchSinkTester.java | 40 +++++++++++++++++-----
.../integration/io/sinks/OpenSearchSinkTester.java | 13 +++----
.../integration/topologies/PulsarCluster.java | 2 ++
6 files changed, 66 insertions(+), 30 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
index 15791ccb623..506620c2db8 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
@@ -18,10 +18,6 @@
*/
package org.apache.pulsar.io.elasticsearch;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest;
import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse;
@@ -29,6 +25,10 @@ import
co.elastic.clients.elasticsearch.security.GetTokenRequest;
import co.elastic.clients.elasticsearch.security.GetTokenResponse;
import
co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
import
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
import org.opensearch.client.Request;
@@ -36,10 +36,11 @@ import org.opensearch.client.Response;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;
+@Slf4j
public abstract class ElasticSearchTestBase {
public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");
public static final String ELASTICSEARCH_7 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
@@ -54,17 +55,28 @@ public abstract class ElasticSearchTestBase {
}
protected ElasticsearchContainer createElasticsearchContainer() {
+ ElasticsearchContainer elasticsearchContainer;
if (elasticImageName.equals(OPENSEARCH)) {
DockerImageName dockerImageName =
DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
- return new ElasticsearchContainer(dockerImageName)
+ elasticsearchContainer = new
ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
+ } else {
+ elasticsearchContainer = new
ElasticsearchContainer(elasticImageName)
+ .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
+ .withEnv("xpack.security.enabled", "false")
+ .withEnv("xpack.security.http.ssl.enabled", "false");
+ }
+ configureElasticContainer(elasticsearchContainer);
+ return elasticsearchContainer;
+ }
+
+ protected void configureElasticContainer(ElasticsearchContainer
elasticContainer) {
+ if (getCompatibilityMode() !=
ElasticSearchConfig.CompatibilityMode.OPENSEARCH) {
+ elasticContainer.withEnv("ingest.geoip.downloader.enabled",
"false");
}
- return new ElasticsearchContainer(elasticImageName)
- .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
- .withEnv("xpack.security.enabled", "false")
- .withEnv("xpack.security.http.ssl.enabled", "false");
+ elasticContainer.withLogConsumer(o -> log.info("elastic> {}",
o.getUtf8String()));
}
protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
index f98471abdf5..440f7cd01b3 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.io.sinks;
import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
@@ -32,8 +31,9 @@ public class ElasticSearch7SinkTester extends
ElasticSearchSinkTester {
super(schemaEnable);
}
+
@Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_7)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
index 9fd29ec1e90..349bb61101f 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -19,13 +19,12 @@
package org.apache.pulsar.tests.integration.io.sinks;
import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");
public ElasticSearch8SinkTester(boolean schemaEnable) {
@@ -33,7 +32,7 @@ public class ElasticSearch8SinkTester extends
ElasticSearchSinkTester {
}
@Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_8)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 52a44fd86db..b931de44b4d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -19,15 +19,6 @@
package org.apache.pulsar.tests.integration.io.sinks;
import static org.testng.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
@@ -35,6 +26,13 @@ import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
@@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
@@ -100,6 +99,29 @@ public abstract class ElasticSearchSinkTester extends
SinkTester<ElasticsearchCo
}
}
+ @Override
+ protected final ElasticsearchContainer createSinkService(PulsarCluster
cluster) {
+ ElasticsearchContainer elasticContainer = createElasticContainer();
+ configureElasticContainer(elasticContainer);
+ return elasticContainer;
+ }
+
+ protected void configureElasticContainer(ElasticsearchContainer
elasticContainer) {
+ if (!isOpenSearch()) {
+ elasticContainer.withEnv("ingest.geoip.downloader.enabled",
"false");
+ }
+
+ // allow disk to fill up beyond default 90% threshold
+
elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled",
"false");
+
+ elasticContainer.withLogConsumer(o -> log.info("elastic> {}",
o.getUtf8String()));
+ }
+
+ protected boolean isOpenSearch() {
+ return false;
+ }
+
+ protected abstract ElasticsearchContainer createElasticContainer();
@Override
public void prepareSink() throws Exception {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 0d496b84a8f..f98641d696d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -18,9 +18,10 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import static org.testng.Assert.assertTrue;
+import java.util.Map;
import java.util.Optional;
import org.apache.http.HttpHost;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
@@ -31,10 +32,6 @@ import org.opensearch.client.RestHighLevelClient;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;
-import java.util.Map;
-
-import static org.testng.Assert.assertTrue;
-
public class OpenSearchSinkTester extends ElasticSearchSinkTester {
public static final String OPENSEARCH =
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
@@ -48,7 +45,7 @@ public class OpenSearchSinkTester extends
ElasticSearchSinkTester {
}
@Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
@@ -57,6 +54,10 @@ public class OpenSearchSinkTester extends
ElasticSearchSinkTester {
.withEnv("plugins.security.disabled", "true");
}
+ protected boolean isOpenSearch() {
+ return true;
+ }
+
@Override
public void prepareSink() throws Exception {
RestClientBuilder builder = RestClient.builder(
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 63fbb281341..d7d8d1423cb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -155,7 +155,9 @@ public class PulsarCluster {
.withEnv("journalSyncData", "false")
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
+ .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold",
"0.95")
.withEnv("diskUsageThreshold", "0.99")
+ .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", "" +
spec.maxMessageSize)
)
);