This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f60bcd [Tests] Reduce Pulsar IO integration test RAM requirements to
prevent system OOM (exit code 137) in GitHub Actions CI (#12547)
7f60bcd is described below
commit 7f60bcd9caa8356ad59a85ff6820269b118e3077
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Oct 30 19:14:22 2021 +0300
[Tests] Reduce Pulsar IO integration test RAM requirements to prevent
system OOM (exit code 137) in GitHub Actions CI (#12547)
* Limit RAM to 128MB for Pulsar IO Sinks and Sources
* Reduce ElasticSearchContainer memory requirements
---
.../integration/containers/ElasticSearchContainer.java | 8 ++++----
.../pulsar/tests/integration/io/PulsarIOTestRunner.java | 15 ++++++++-------
.../tests/integration/io/sinks/PulsarIOSinkRunner.java | 14 +++++++++-----
.../integration/io/sources/PulsarIOSourceRunner.java | 8 +++++---
4 files changed, 26 insertions(+), 19 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
index d0bbe9d..82deb2a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
@@ -23,7 +23,7 @@ import
org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import java.util.Optional;
public class ElasticSearchContainer extends
ChaosContainer<ElasticSearchContainer> {
-
+
public static final String NAME = "ElasticSearch";
static final Integer[] PORTS = { 9200, 9300 };
@@ -31,16 +31,16 @@ public class ElasticSearchContainer extends
ChaosContainer<ElasticSearchContaine
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.15.0");
public ElasticSearchContainer(String clusterName) {
- super(clusterName, IMAGE_NAME);
+ super(clusterName, IMAGE_NAME);
}
-
+
@Override
protected void configure() {
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withEnv("discovery.type", "single-node")
- .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+ .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx1500m")
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(clusterName + "-" + NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
index 7d95d16..2269e49 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
@@ -38,7 +38,8 @@ import net.jodah.failsafe.RetryPolicy;
@Slf4j
public abstract class PulsarIOTestRunner {
-
+ static final long MB = 1048576L;
+ public static final long RUNTIME_INSTANCE_RAM_BYTES = 128 * MB;
final Duration ONE_MINUTE = Duration.ofMinutes(1);
final Duration TEN_SECONDS = Duration.ofSeconds(10);
@@ -47,15 +48,15 @@ public abstract class PulsarIOTestRunner {
.withMaxDuration(ONE_MINUTE)
.withDelay(TEN_SECONDS)
.onRetry(e -> log.error("Retry ... "));
-
+
protected PulsarCluster pulsarCluster;
protected String functionRuntimeType;
-
+
protected PulsarIOTestRunner(PulsarCluster cluster, String
functionRuntimeType) {
this.pulsarCluster = cluster;
this.functionRuntimeType = functionRuntimeType;
}
-
+
@SuppressWarnings("rawtypes")
protected Schema getSchema(boolean jsonWithEnvelope) {
if (jsonWithEnvelope) {
@@ -64,7 +65,7 @@ public abstract class PulsarIOTestRunner {
return KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(),
Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
}
}
-
+
protected <T> void ensureSubscriptionCreated(String inputTopicName,
String subscriptionName,
Schema<T>
inputTopicSchema)
@@ -81,7 +82,7 @@ public abstract class PulsarIOTestRunner {
}
}
}
-
+
protected Map<String, String> produceMessagesToInputTopic(String
inputTopicName,
int numMessages,
SinkTester<?> tester) throws Exception {
@@ -92,5 +93,5 @@ public abstract class PulsarIOTestRunner {
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
tester.produceMessage(numMessages, client, inputTopicName, kvs);
return kvs;
- }
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index 4b055d1..b6500e36 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -155,7 +155,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
"--name", sinkName,
"--sink-type", tester.sinkType().getValue().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
- "--inputs", inputTopicName
+ "--inputs", inputTopicName,
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
} else {
commands = new String[] {
@@ -167,7 +168,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
"--archive", tester.getSinkArchive(),
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
- "--inputs", inputTopicName
+ "--inputs", inputTopicName,
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -193,7 +195,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
"--sink-type", tester.sinkType().getValue().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
- "--parallelism", "2"
+ "--parallelism", "2",
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
} else {
commands = new String[] {
@@ -206,7 +209,8 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
- "--parallelism", "2"
+ "--parallelism", "2",
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -337,7 +341,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(),
0);
}
-
+
// This for JdbcPostgresSinkTester
protected Map<String, String>
produceSchemaInsertMessagesToInputTopic(String inputTopicName,
int
numMessages,
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
index ddad6ed..d1e5049 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
@@ -49,7 +49,7 @@ import net.jodah.failsafe.Failsafe;
@Slf4j
public class PulsarIOSourceRunner extends PulsarIOTestRunner {
-
+
public PulsarIOSourceRunner(PulsarCluster cluster, String
functionRuntimeType) {
super(cluster, functionRuntimeType);
}
@@ -131,7 +131,8 @@ public class PulsarIOSourceRunner extends
PulsarIOTestRunner {
"--name", sourceName,
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
- "--destinationTopicName", outputTopicName
+ "--destinationTopicName", outputTopicName,
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
@@ -156,7 +157,8 @@ public class PulsarIOSourceRunner extends
PulsarIOTestRunner {
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName,
- "--parallelism", "2"
+ "--parallelism", "2",
+ "--ram", String.valueOf(RUNTIME_INSTANCE_RAM_BYTES)
};
log.info("Run command : {}", StringUtils.join(commands, ' '));