sijie closed pull request #2233: [integration tests] Return exit code as part
of execution result of running a command in testcontainers
URL: https://github.com/apache/incubator-pulsar/pull/2233
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 7b4d0d00dd..e69b0dd4a5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,14 +18,15 @@
*/
package org.apache.pulsar.tests.integration.cli;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.Assert;
import org.testng.annotations.Test;
/**
@@ -37,7 +38,8 @@
public void testDeprecatedCommands() throws Exception {
String tenantName = "test-deprecated-commands";
- ExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help");
+ ContainerExecResult result =
pulsarCluster.runAdminCommandOnAnyBroker("--help");
+ assertEquals(0, result.getExitCode());
assertFalse(result.getStdout().isEmpty());
assertFalse(result.getStdout().contains("Usage: properties "));
result = pulsarCluster.runAdminCommandOnAnyBroker(
@@ -63,7 +65,7 @@ public void testCreateSubscriptionCommand() throws Exception {
int i = 0;
for (BrokerContainer container : pulsarCluster.getBrokers()) {
- ExecResult result = container.execCmd(
+ ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"persistent",
"create-subscription",
@@ -71,6 +73,7 @@ public void testCreateSubscriptionCommand() throws Exception {
"--subscription",
"" + subscriptionPrefix + i
);
+ assertEquals(0, result.getExitCode());
assertTrue(result.getStdout().isEmpty());
assertTrue(result.getStderr().isEmpty());
i++;
@@ -81,7 +84,7 @@ public void testCreateSubscriptionCommand() throws Exception {
public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws
Exception {
String topicName =
"persistent://public/default/test-topic-termination";
BrokerContainer container = pulsarCluster.getAnyBroker();
- ExecResult result = container.execCmd(
+ ContainerExecResult result = container.execCmd(
PulsarCluster.CLIENT_SCRIPT,
"produce",
"-m",
@@ -90,7 +93,8 @@ public void
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
"1",
topicName);
- Assert.assertTrue(result.getStdout().contains("1 messages successfully
produced"));
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().contains("1 messages successfully
produced"));
// terminate the topic
result = container.execCmd(
@@ -98,7 +102,8 @@ public void
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
"persistent",
"terminate",
topicName);
- Assert.assertTrue(result.getStdout().contains("Topic succesfully
terminated at"));
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().contains("Topic succesfully terminated
at"));
// try to produce should fail
result = pulsarCluster.getAnyBroker().execCmd(
@@ -109,6 +114,7 @@ public void
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
"-n",
"1",
topicName);
+ assertNotEquals(0, result.getExitCode());
assertTrue(result.getStdout().contains("Topic was already
terminated"));
}
@@ -117,7 +123,7 @@ public void testSchemaCLI() throws Exception {
BrokerContainer container = pulsarCluster.getAnyBroker();
String topicName = "persistent://public/default/test-schema-cli";
- ExecResult result = container.execCmd(
+ ContainerExecResult result = container.execCmd(
PulsarCluster.CLIENT_SCRIPT,
"produce",
"-m",
@@ -125,7 +131,8 @@ public void testSchemaCLI() throws Exception {
"-n",
"1",
topicName);
- Assert.assertTrue(result.getStdout().contains("1 messages successfully
produced"));
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().contains("1 messages successfully
produced"));
result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
@@ -135,8 +142,9 @@ public void testSchemaCLI() throws Exception {
"-f",
"/pulsar/conf/schema_example.conf"
);
- Assert.assertTrue(result.getStdout().isEmpty());
- Assert.assertTrue(result.getStderr().isEmpty());
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().isEmpty());
+ assertTrue(result.getStderr().isEmpty());
// get schema
result = container.execCmd(
@@ -144,7 +152,8 @@ public void testSchemaCLI() throws Exception {
"schemas",
"get",
topicName);
- Assert.assertTrue(result.getStdout().contains("\"type\" :
\"STRING\""));
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().contains("\"type\" : \"STRING\""));
// delete the schema
result = container.execCmd(
@@ -152,8 +161,9 @@ public void testSchemaCLI() throws Exception {
"schemas",
"delete",
topicName);
- Assert.assertTrue(result.getStdout().isEmpty());
- Assert.assertTrue(result.getStderr().isEmpty());
+ assertEquals(0, result.getExitCode());
+ assertTrue(result.getStdout().isEmpty());
+ assertTrue(result.getStderr().isEmpty());
// get schema again
result = container.execCmd(
@@ -162,6 +172,7 @@ public void testSchemaCLI() throws Exception {
"get",
"persistent://public/default/test-schema-cli"
);
+ assertNotEquals(0, result.getExitCode());
assertTrue(result.getStderr().contains("Reason: HTTP 404 Not Found"));
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 6825e73520..05d6235de6 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -23,12 +23,11 @@
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
import org.testng.annotations.Test;
-import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
public class TestCompaction extends PulsarClusterTestBase {
@@ -54,12 +53,12 @@ public void testPublishCompactAndConsumeCLI(String
serviceUrl) throws Exception
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content0".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
}
pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic",
"-t", topic);
@@ -67,8 +66,8 @@ public void testPublishCompactAndConsumeCLI(String
serviceUrl) throws Exception
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
}
}
}
@@ -98,12 +97,12 @@ public void testPublishCompactAndConsumeRest(String
serviceUrl) throws Exception
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content0".getBytes());
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
}
pulsarCluster.runAdminCommandOnAnyBroker("persistent",
"compact", topic);
@@ -114,8 +113,8 @@ public void testPublishCompactAndConsumeRest(String
serviceUrl) throws Exception
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getKey(), "key0");
+ assertEquals(m.getData(), "content1".getBytes());
}
}
}
@@ -126,7 +125,7 @@ private static void waitAndVerifyCompacted(PulsarClient
client, String topic,
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), expectedKey);
+ assertEquals(m.getKey(), expectedKey);
if (new String(m.getData()).equals(expectedValue)) {
break;
}
@@ -136,8 +135,8 @@ private static void waitAndVerifyCompacted(PulsarClient
client, String topic,
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
Message<byte[]> m = consumer.receive();
- Assert.assertEquals(m.getKey(), expectedKey);
- Assert.assertEquals(new String(m.getData()), expectedValue);
+ assertEquals(m.getKey(), expectedKey);
+ assertEquals(new String(m.getData()), expectedValue);
}
}
@@ -172,20 +171,24 @@ public void testPublishWithAutoCompaction(String
serviceUrl) throws Exception {
}
}
- private Container.ExecResult createTenantName(final String tenantName,
- final String
allowedClusterName,
- final String adminRoleName)
throws Exception {
- return pulsarCluster.runAdminCommandOnAnyBroker(
+ private ContainerExecResult createTenantName(final String tenantName,
+ final String
allowedClusterName,
+ final String adminRoleName)
throws Exception {
+ ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"tenants", "create", "--allowed-clusters", allowedClusterName,
"--admin-roles", adminRoleName, tenantName);
+ assertEquals(0, result.getExitCode());
+ return result;
}
- private Container.ExecResult createNamespace(final String Ns) throws
Exception {
- return pulsarCluster.runAdminCommandOnAnyBroker(
+ private ContainerExecResult createNamespace(final String Ns) throws
Exception {
+ ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces",
"create",
"--clusters",
pulsarCluster.getClusterName(), Ns);
+ assertEquals(0, result.getExitCode());
+ return result;
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 32e9583b17..239954dc32 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.LogContainerCmd;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.core.command.LogContainerResultCallback;
@@ -28,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.testcontainers.containers.GenericContainer;
@@ -101,18 +103,11 @@ public void onNext(Frame item) {
return sb.toString();
}
- public ExecResult execCmd(String... cmd) throws Exception {
- String cmdString = StringUtils.join(cmd, " ");
-
- log.info("DOCKER.exec({}:{}): Executing ...",
containerName.substring(1), cmdString);
-
- ExecResult result = execInContainer(cmd);
-
- log.info("Docker.exec({}:{}): Done", containerName.substring(1),
cmdString);
- log.info("Docker.exec({}:{}): Stdout -\n{}",
containerName.substring(1), cmdString, result.getStdout());
- log.info("Docker.exec({}:{}): Stderr -\n{}",
containerName.substring(1), cmdString, result.getStderr());
-
- return result;
+ public ContainerExecResult execCmd(String... commands) throws Exception {
+ DockerClient client = this.getDockerClient();
+ String dockerId = this.getContainerId();
+ return DockerUtils.runCommand(
+ client, dockerId, true, commands);
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
new file mode 100644
index 0000000000..fe040ad063
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.docker;
+
+import lombok.Data;
+
+/**
+ * Represents the result of executing a command.
+ */
+@Data(staticConstructor = "of")
+public class ContainerExecResult {
+
+ private final int exitCode;
+ private final String stdout;
+ private final String stderr;
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d83661ca69..a541c092bd 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,13 +18,14 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.Test;
@Slf4j
@@ -50,7 +51,8 @@ public String checkUpload() throws Exception {
String[] commands = {
"sh", "-c", actualCommand
};
- ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult output =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, output.getExitCode());
assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
return bkPkgPath;
}
@@ -71,7 +73,8 @@ public void checkDownload() throws Exception {
"sh", "-c", actualCommand
};
WorkerContainer container = pulsarCluster.getAnyWorker();
- ExecResult output = container.execCmd(commands);
+ ContainerExecResult output = container.execCmd(commands);
+ assertEquals(0, output.getExitCode());
assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
String[] diffCommand = {
"diff",
@@ -79,6 +82,7 @@ public void checkDownload() throws Exception {
localPkgFile
};
output = container.execCmd(diffCommand);
+ assertEquals(0, output.getExitCode());
assertTrue(output.getStdout().isEmpty());
assertTrue(output.getStderr().isEmpty());
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6477323d99..8b7bafc566 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.functions.runtime;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import lombok.Cleanup;
@@ -28,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
@@ -103,13 +105,14 @@ private static void submitExclamationFunction(Runtime
runtime,
String[] commands = {
"sh", "-c", command
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
+ assertEquals(0, result.getExitCode());
assertTrue(result.getStdout().contains("\"Created successfully\""));
}
private static void getFunctionInfoSuccess(String functionName) throws
Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
@@ -117,11 +120,12 @@ private static void getFunctionInfoSuccess(String
functionName) throws Exception
"--namespace", "default",
"--name", functionName
);
+ assertEquals(0, result.getExitCode());
assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
}
private static void getFunctionInfoNotFound(String functionName) throws
Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
@@ -129,11 +133,12 @@ private static void getFunctionInfoNotFound(String
functionName) throws Exceptio
"--namespace", "default",
"--name", functionName
);
+ assertNotEquals(0, result.getExitCode());
assertTrue(result.getStderr().contains("Reason: Function " +
functionName + " doesn't exist"));
}
private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"getstatus",
@@ -141,6 +146,7 @@ private static void getFunctionStatus(String functionName,
int numMessages) thro
"--namespace", "default",
"--name", functionName
);
+ assertEquals(0, result.getExitCode());
assertTrue(result.getStdout().contains("\"running\": true"));
assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
@@ -172,7 +178,7 @@ private static void publishAndConsumeMessages(String
inputTopic,
}
private static void deleteFunction(String functionName) throws Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"delete",
@@ -180,6 +186,7 @@ private static void deleteFunction(String functionName)
throws Exception {
"--namespace", "default",
"--name", functionName
);
+ assertEquals(0, result.getExitCode());
assertTrue(result.getStdout().contains("Deleted successfully"));
assertTrue(result.getStderr().isEmpty());
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index d0aab47f36..71cad0501b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.io;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.base.Stopwatch;
@@ -32,11 +34,11 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
@@ -131,7 +133,8 @@ protected void submitSinkConnector(String tenant,
"--inputs", inputTopicName
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
@@ -146,8 +149,9 @@ protected void getSinkInfoSuccess(String tenant, String
namespace, String sinkNa
"--namespace", namespace,
"--name", sinkName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink info : {}", result.getStdout());
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("\"builtin\": \"" + tester.sinkType +
"\""),
result.getStdout()
@@ -164,9 +168,9 @@ protected void getSinkStatus(String tenant, String
namespace, String sinkName) t
"--name", sinkName
};
while (true) {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"running\": true")) {
+ if (0 == result.getExitCode() &&
result.getStdout().contains("\"running\": true")) {
return;
}
log.info("Backoff 1 second until the function is running");
@@ -211,9 +215,10 @@ protected void waitForProcessingMessages(String tenant,
};
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ if (0 == result.getExitCode()
+ && result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
return;
}
log.info("{} ms has elapsed but the sink hasn't process {}
messages, backoff to wait for another 1 second",
@@ -231,7 +236,8 @@ protected void deleteSink(String tenant, String namespace,
String sinkName) thro
"--namespace", namespace,
"--name", sinkName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("Deleted successfully"),
result.getStdout()
@@ -251,7 +257,8 @@ protected void getSinkInfoNotFound(String tenant, String
namespace, String sinkN
"--namespace", namespace,
"--name", sinkName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertNotEquals(0, result.getExitCode());
assertTrue(result.getStderr().contains("Reason: Function " + sinkName
+ " doesn't exist"));
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
index 78d8874d4f..8468976e23 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.io;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.base.Stopwatch;
@@ -34,11 +35,11 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
@@ -143,7 +144,8 @@ protected void submitSourceConnector(String tenant,
"--destinationTopicName", outputTopicName
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
@@ -158,8 +160,9 @@ protected void getSourceInfoSuccess(String tenant, String
namespace, String sour
"--namespace", namespace,
"--name", sourceName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source info : {}", result.getStdout());
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("\"builtin\": \"" + tester.sourceType
+ "\""),
result.getStdout()
@@ -176,9 +179,9 @@ protected void getSourceStatus(String tenant, String
namespace, String sourceNam
"--name", sourceName
};
while (true) {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
- if (result.getStdout().contains("\"running\": true")) {
+ if (0 == result.getExitCode() &&
result.getStdout().contains("\"running\": true")) {
return;
}
log.info("Backoff 1 second until the function is running");
@@ -209,9 +212,10 @@ protected void waitForProcessingMessages(String tenant,
};
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get source status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ if (0 == result.getExitCode()
+ && result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
return;
}
log.info("{} ms has elapsed but the source hasn't process {}
messages, backoff to wait for another 1 second",
@@ -229,7 +233,8 @@ protected void deleteSource(String tenant, String
namespace, String sourceName)
"--namespace", namespace,
"--name", sourceName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, result.getExitCode());
assertTrue(
result.getStdout().contains("Delete source successfully"),
result.getStdout()
@@ -249,7 +254,8 @@ protected void getSourceInfoNotFound(String tenant, String
namespace, String sou
"--namespace", namespace,
"--name", sourceName
};
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertNotEquals(0, result.getExitCode());
assertTrue(result.getStderr().contains("Reason: Function " +
sourceName + " doesn't exist"));
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
index 85a8d2db35..b22bd7476b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.smoke;
+import static org.testng.Assert.assertEquals;
+
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -25,9 +27,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
@@ -69,15 +70,17 @@ public void testPublishAndConsume(String serviceUrl) throws
Exception {
}
for (int i = 0; i < 10; i++) {
Message m = consumer.receive();
- Assert.assertEquals("smoke-message" + i, new String(m.getData()));
+ assertEquals("smoke-message" + i, new String(m.getData()));
}
}
- private Container.ExecResult createTenantName(String tenantName,
- String clusterName,
- String roleName) throws
Exception {
- return pulsarCluster.runAdminCommandOnAnyBroker(
+ private ContainerExecResult createTenantName(String tenantName,
+ String clusterName,
+ String roleName) throws
Exception {
+ ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
"tenants", "create", tenantName, "--allowed-clusters", clusterName,
"--admin-roles", roleName);
+ assertEquals(0, result.getExitCode());
+ return result;
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bdd8123a0f..2f1653a9b4 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -43,7 +43,7 @@
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.testcontainers.containers.Container.ExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -315,15 +315,15 @@ public WorkerContainer getAnyWorker() {
return brokerContainers.values();
}
- public ExecResult runAdminCommandOnAnyBroker(String...commands) throws
Exception {
+ public ContainerExecResult runAdminCommandOnAnyBroker(String...commands)
throws Exception {
return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
}
- public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands)
throws Exception {
+ public ContainerExecResult
runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception {
return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT,
commands);
}
- private ExecResult runCommandOnAnyBrokerWithScript(String scriptType,
String...commands) throws Exception {
+ private ContainerExecResult runCommandOnAnyBrokerWithScript(String
scriptType, String...commands) throws Exception {
BrokerContainer container = getAnyBroker();
String[] cmds = new String[commands.length + 1];
cmds[0] = scriptType;
@@ -339,13 +339,13 @@ public void startAllBrokers() {
brokerContainers.values().forEach(BrokerContainer::start);
}
- public ExecResult createNamespace(String nsName) throws Exception {
+ public ContainerExecResult createNamespace(String nsName) throws Exception
{
return runAdminCommandOnAnyBroker(
"namespaces", "create", "public/" + nsName,
"--clusters", clusterName);
}
- public ExecResult enableDeduplication(String nsName, boolean enabled)
throws Exception {
+ public ContainerExecResult enableDeduplication(String nsName, boolean
enabled) throws Exception {
return runAdminCommandOnAnyBroker(
"namespaces", "set-deduplication", "public/" + nsName,
enabled ? "--enable" : "--disable");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 51fb8d8790..8684201bb8 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.utils;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -25,6 +27,7 @@
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.ContainerNetwork;
+import com.github.dockerjava.api.model.StreamType;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
@@ -34,16 +37,15 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,16 +164,26 @@ public static String getContainerIP(DockerClient docker,
String containerId) {
throw new IllegalArgumentException("Container " + containerId + " has
no networks");
}
- public static String getContainerHostname(DockerClient docker, String
containerId) {
- return runCommand(docker, containerId, "hostname").trim();
+ public static ContainerExecResult runCommand(DockerClient docker, String
containerId, String... cmd)
+ throws Exception {
+ return runCommand(docker, containerId, false, cmd);
}
- public static String runCommand(DockerClient docker, String containerId,
String... cmd) {
+ public static ContainerExecResult runCommand(DockerClient docker,
+ String containerId,
+ boolean ignoreError,
+ String... cmd)
+ throws Exception {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- String execid = docker.execCreateCmd(containerId).withCmd(cmd)
- .withAttachStderr(true).withAttachStdout(true).exec().getId();
+ String execid = docker.execCreateCmd(containerId)
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .exec()
+ .getId();
String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
- StringBuffer output = new StringBuffer();
+ StringBuilder stdout = new StringBuilder();
+ StringBuilder stderr = new StringBuilder();
docker.execStartCmd(execid).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
@@ -185,7 +197,11 @@ public void onStart(Closeable closeable) {
@Override
public void onNext(Frame object) {
LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString,
object);
- output.append(new String(object.getPayload()));
+ if (StreamType.STDOUT == object.getStreamType()) {
+ stdout.append(new String(object.getPayload(), UTF_8));
+ } else if (StreamType.STDERR == object.getStreamType()) {
+ stderr.append(new String(object.getPayload(), UTF_8));
+ }
}
@Override
@@ -213,14 +229,22 @@ public void onComplete() {
}
int retCode = resp.getExitCode();
if (retCode != 0) {
- LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId,
cmdString, retCode, output);
- throw new RuntimeException(
- String.format("cmd(%s) failed on %s with exitcode %d",
- cmdString, containerId, retCode));
+ if (!ignoreError) {
+ LOG.error("DOCKER.exec({}:{}): failed with {}
:\nStdout:\n{}\n\nStderr:\n{}",
+ containerId, cmdString, retCode, stdout.toString(),
stderr.toString());
+ throw new Exception(String.format("cmd(%s) failed on %s with
exitcode %d",
+ cmdString, containerId, retCode));
+ } else {
+ LOG.error("DOCKER.exec({}:{}): failed with {}", containerId,
cmdString, retCode);
+ }
} else {
LOG.info("DOCKER.exec({}:{}): completed with {}", containerId,
cmdString, retCode);
}
- return output.toString();
+ return ContainerExecResult.of(
+ retCode,
+ stdout.toString(),
+ stderr.toString()
+ );
}
public static Optional<String> getContainerCluster(DockerClient docker,
String containerId) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services