This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f60bcd  [Tests] Reduce Pulsar IO integration test RAM requirements to 
prevent system OOM (exit code 137) in GitHub Actions CI (#12547)
7f60bcd is described below

commit 7f60bcd9caa8356ad59a85ff6820269b118e3077
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Oct 30 19:14:22 2021 +0300

    [Tests] Reduce Pulsar IO integration test RAM requirements to prevent 
system OOM (exit code 137) in GitHub Actions CI (#12547)
    
    * Limit RAM to 128MB for Pulsar IO Sinks and Sources
    
    * Reduce ElasticSearchContainer memory requirements
---
 .../integration/containers/ElasticSearchContainer.java    |  8 ++++----
 .../pulsar/tests/integration/io/PulsarIOTestRunner.java   | 15 ++++++++-------
 .../tests/integration/io/sinks/PulsarIOSinkRunner.java    | 14 +++++++++-----
 .../integration/io/sources/PulsarIOSourceRunner.java      |  8 +++++---
 4 files changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
index d0bbe9d..82deb2a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
@@ -23,7 +23,7 @@ import 
org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 import java.util.Optional;
 
 public class ElasticSearchContainer extends 
ChaosContainer<ElasticSearchContainer> {
-    
+
     public static final String NAME = "ElasticSearch";
     static final Integer[] PORTS = { 9200, 9300 };
 
@@ -31,16 +31,16 @@ public class ElasticSearchContainer extends 
ChaosContainer<ElasticSearchContaine
             .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.15.0");
 
     public ElasticSearchContainer(String clusterName) {
-        super(clusterName, IMAGE_NAME);       
+        super(clusterName, IMAGE_NAME);
     }
-    
+
     @Override
     protected void configure() {
         super.configure();
         this.withNetworkAliases(NAME)
             .withExposedPorts(PORTS)
             .withEnv("discovery.type", "single-node")
-            .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+            .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx1500m")
             .withCreateContainerCmdModifier(createContainerCmd -> {
                 createContainerCmd.withHostName(NAME);
                 createContainerCmd.withName(clusterName + "-" + NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
index 7d95d16..2269e49 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
@@ -38,7 +38,8 @@ import net.jodah.failsafe.RetryPolicy;
 
 @Slf4j
 public abstract class PulsarIOTestRunner {
-
+    static final long MB = 1048576L;
+    public static final long RUNTIME_INSTANCE_RAM_BYTES = 128 * MB;
     final Duration ONE_MINUTE = Duration.ofMinutes(1);
     final Duration TEN_SECONDS = Duration.ofSeconds(10);
 
@@ -47,15 +48,15 @@ public abstract class PulsarIOTestRunner {
             .withMaxDuration(ONE_MINUTE)
             .withDelay(TEN_SECONDS)
             .onRetry(e -> log.error("Retry ... "));
-    
+
     protected PulsarCluster pulsarCluster;
     protected String functionRuntimeType;
-    
+
     protected PulsarIOTestRunner(PulsarCluster cluster, String 
functionRuntimeType) {
       this.pulsarCluster = cluster;
       this.functionRuntimeType = functionRuntimeType;
     }
-    
+
     @SuppressWarnings("rawtypes")
        protected Schema getSchema(boolean jsonWithEnvelope) {
         if (jsonWithEnvelope) {
@@ -64,7 +65,7 @@ public abstract class PulsarIOTestRunner {
             return KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), 
Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
         }
     }
-    
+
     protected <T> void ensureSubscriptionCreated(String inputTopicName,
                                                       String subscriptionName,
                                                       Schema<T> 
inputTopicSchema)
@@ -81,7 +82,7 @@ public abstract class PulsarIOTestRunner {
             }
         }
     }
-    
+
     protected Map<String, String> produceMessagesToInputTopic(String 
inputTopicName,
                                                               int numMessages, 
SinkTester<?> tester) throws Exception {
 
@@ -92,5 +93,5 @@ public abstract class PulsarIOTestRunner {
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         tester.produceMessage(numMessages, client, inputTopicName, kvs);
         return kvs;
-    }  
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index 4b055d1..b6500e36 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -155,7 +155,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
                     "--name", sinkName,
                     "--sink-type", tester.sinkType().getValue().toLowerCase(),
                     "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-                    "--inputs", inputTopicName
+                    "--inputs", inputTopicName,
+                    "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
             };
         } else {
             commands = new String[] {
@@ -167,7 +168,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
                     "--archive", tester.getSinkArchive(),
                     "--classname", tester.getSinkClassName(),
                     "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-                    "--inputs", inputTopicName
+                    "--inputs", inputTopicName,
+                    "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
             };
         }
         log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -193,7 +195,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
                     "--sink-type", tester.sinkType().getValue().toLowerCase(),
                     "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
                     "--inputs", inputTopicName,
-                    "--parallelism", "2"
+                    "--parallelism", "2",
+                    "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
             };
         } else {
             commands = new String[] {
@@ -206,7 +209,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
                     "--classname", tester.getSinkClassName(),
                     "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
                     "--inputs", inputTopicName,
-                    "--parallelism", "2"
+                    "--parallelism", "2",
+                    "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
             };
         }
         log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -337,7 +341,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
         
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
         
assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
 0);
     }
-    
+
     // This for JdbcPostgresSinkTester
     protected Map<String, String> 
produceSchemaInsertMessagesToInputTopic(String inputTopicName,
                                                                           int 
numMessages,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
index ddad6ed..d1e5049 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
@@ -49,7 +49,7 @@ import net.jodah.failsafe.Failsafe;
 
 @Slf4j
 public class PulsarIOSourceRunner extends PulsarIOTestRunner {
-       
+
     public PulsarIOSourceRunner(PulsarCluster cluster, String 
functionRuntimeType) {
                super(cluster, functionRuntimeType);
        }
@@ -131,7 +131,8 @@ public class PulsarIOSourceRunner extends 
PulsarIOTestRunner {
             "--name", sourceName,
             "--source-type", tester.sourceType(),
             "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
-            "--destinationTopicName", outputTopicName
+            "--destinationTopicName", outputTopicName,
+            "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
         };
 
         log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -156,7 +157,8 @@ public class PulsarIOSourceRunner extends 
PulsarIOTestRunner {
                 "--source-type", tester.sourceType(),
                 "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
                 "--destinationTopicName", outputTopicName,
-                "--parallelism", "2"
+                "--parallelism", "2",
+                "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
         };
 
         log.info("Run command : {}", StringUtils.join(commands, ' '));

Reply via email to