This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 64e60b56278 [improve][test] Disable disk usage threshold & geoip 
download and enable logging for Elastic Testcontainers (#20671)
64e60b56278 is described below

commit 64e60b562780a2c137439166420313b4525fde87
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 28 21:10:05 2023 +0300

    [improve][test] Disable disk usage threshold & geoip download and enable 
logging for Elastic Testcontainers (#20671)
    
    (cherry picked from commit e97fe5b8ac376ecead0097b184f0f1436fce88e4)
    
    # Conflicts:
    #       
tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
---
 .../io/elasticsearch/ElasticSearchTestBase.java    | 32 +++++++++++------
 .../io/sinks/ElasticSearch7SinkTester.java         |  4 +--
 .../io/sinks/ElasticSearch8SinkTester.java         |  5 ++-
 .../io/sinks/ElasticSearchSinkTester.java          | 40 +++++++++++++++++-----
 .../integration/io/sinks/OpenSearchSinkTester.java | 13 +++----
 .../integration/topologies/PulsarCluster.java      |  2 ++
 6 files changed, 66 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
index 15791ccb623..506620c2db8 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest;
 import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse;
@@ -29,6 +25,10 @@ import 
co.elastic.clients.elasticsearch.security.GetTokenRequest;
 import co.elastic.clients.elasticsearch.security.GetTokenResponse;
 import 
co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
 import 
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
 import org.opensearch.client.Request;
@@ -36,10 +36,11 @@ import org.opensearch.client.Response;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testcontainers.utility.DockerImageName;
 
+@Slf4j
 public abstract class ElasticSearchTestBase {
 
     public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");
 
     public static final String ELASTICSEARCH_7 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
             .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
@@ -54,17 +55,28 @@ public abstract class ElasticSearchTestBase {
     }
 
     protected ElasticsearchContainer createElasticsearchContainer() {
+        ElasticsearchContainer elasticsearchContainer;
         if (elasticImageName.equals(OPENSEARCH)) {
             DockerImageName dockerImageName = 
DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
-            return new ElasticsearchContainer(dockerImageName)
+            elasticsearchContainer = new 
ElasticsearchContainer(dockerImageName)
                     .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
                     .withEnv("bootstrap.memory_lock", "true")
                     .withEnv("plugins.security.disabled", "true");
+        } else {
+            elasticsearchContainer = new 
ElasticsearchContainer(elasticImageName)
+                    .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
+                    .withEnv("xpack.security.enabled", "false")
+                    .withEnv("xpack.security.http.ssl.enabled", "false");
+        }
+        configureElasticContainer(elasticsearchContainer);
+        return elasticsearchContainer;
+    }
+
+    protected void configureElasticContainer(ElasticsearchContainer 
elasticContainer) {
+        if (getCompatibilityMode() != 
ElasticSearchConfig.CompatibilityMode.OPENSEARCH) {
+            elasticContainer.withEnv("ingest.geoip.downloader.enabled", 
"false");
         }
-        return new ElasticsearchContainer(elasticImageName)
-                .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
-                .withEnv("xpack.security.enabled", "false")
-                .withEnv("xpack.security.http.ssl.enabled", "false");
+        elasticContainer.withLogConsumer(o -> log.info("elastic> {}", 
o.getUtf8String()));
     }
 
     protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
index f98471abdf5..440f7cd01b3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.tests.integration.io.sinks;
 
 import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
@@ -32,8 +31,9 @@ public class ElasticSearch7SinkTester extends 
ElasticSearchSinkTester {
         super(schemaEnable);
     }
 
+
     @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+    protected ElasticsearchContainer createElasticContainer() {
         return new ElasticsearchContainer(ELASTICSEARCH_7)
                 .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
index 9fd29ec1e90..349bb61101f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -19,13 +19,12 @@
 package org.apache.pulsar.tests.integration.io.sinks;
 
 import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
 
     public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");
 
 
     public ElasticSearch8SinkTester(boolean schemaEnable) {
@@ -33,7 +32,7 @@ public class ElasticSearch8SinkTester extends 
ElasticSearchSinkTester {
     }
 
     @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+    protected ElasticsearchContainer createElasticContainer() {
         return new ElasticsearchContainer(ELASTICSEARCH_8)
                 .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
                 .withEnv("xpack.security.enabled", "false")
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 52a44fd86db..b931de44b4d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -19,15 +19,6 @@
 package org.apache.pulsar.tests.integration.io.sinks;
 
 import static org.testng.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
 import co.elastic.clients.elasticsearch.core.SearchResponse;
@@ -35,6 +26,13 @@ import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
 import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.Data;
@@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
@@ -100,6 +99,29 @@ public abstract class ElasticSearchSinkTester extends 
SinkTester<ElasticsearchCo
         }
     }
 
+    @Override
+    protected final ElasticsearchContainer createSinkService(PulsarCluster 
cluster) {
+        ElasticsearchContainer elasticContainer = createElasticContainer();
+        configureElasticContainer(elasticContainer);
+        return elasticContainer;
+    }
+
+    protected void configureElasticContainer(ElasticsearchContainer 
elasticContainer) {
+        if (!isOpenSearch()) {
+            elasticContainer.withEnv("ingest.geoip.downloader.enabled", 
"false");
+        }
+
+        // allow disk to fill up beyond default 90% threshold
+        
elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", 
"false");
+
+        elasticContainer.withLogConsumer(o -> log.info("elastic> {}", 
o.getUtf8String()));
+    }
+
+    protected boolean isOpenSearch() {
+        return false;
+    }
+
+    protected abstract ElasticsearchContainer createElasticContainer();
 
     @Override
     public void prepareSink() throws Exception {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 0d496b84a8f..f98641d696d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import static org.testng.Assert.assertTrue;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.http.HttpHost;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.opensearch.action.search.SearchRequest;
 import org.opensearch.action.search.SearchResponse;
@@ -31,10 +32,6 @@ import org.opensearch.client.RestHighLevelClient;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testcontainers.utility.DockerImageName;
 
-import java.util.Map;
-
-import static org.testng.Assert.assertTrue;
-
 public class OpenSearchSinkTester extends ElasticSearchSinkTester {
 
     public static final String OPENSEARCH = 
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
@@ -48,7 +45,7 @@ public class OpenSearchSinkTester extends 
ElasticSearchSinkTester {
     }
 
     @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+    protected ElasticsearchContainer createElasticContainer() {
         DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
                 
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
         return new ElasticsearchContainer(dockerImageName)
@@ -57,6 +54,10 @@ public class OpenSearchSinkTester extends 
ElasticSearchSinkTester {
                 .withEnv("plugins.security.disabled", "true");
     }
 
+    protected boolean isOpenSearch() {
+        return true;
+    }
+
     @Override
     public void prepareSink() throws Exception {
         RestClientBuilder builder = RestClient.builder(
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 63fbb281341..d7d8d1423cb 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -155,7 +155,9 @@ public class PulsarCluster {
                         .withEnv("journalSyncData", "false")
                         .withEnv("journalMaxGroupWaitMSec", "0")
                         .withEnv("clusterName", clusterName)
+                        .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", 
"0.95")
                         .withEnv("diskUsageThreshold", "0.99")
+                        .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
                         .withEnv("nettyMaxFrameSizeBytes", "" + 
spec.maxMessageSize)
                 )
         );

Reply via email to