sijie closed pull request #2233:  [integration tests] Return exit code as part 
of execution result of running a command in testcontainers
URL: https://github.com/apache/incubator-pulsar/pull/2233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 7b4d0d00dd..e69b0dd4a5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -37,7 +38,8 @@
     public void testDeprecatedCommands() throws Exception {
         String tenantName = "test-deprecated-commands";
 
-        ExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help");
+        ContainerExecResult result = 
pulsarCluster.runAdminCommandOnAnyBroker("--help");
+        assertEquals(0, result.getExitCode());
         assertFalse(result.getStdout().isEmpty());
         assertFalse(result.getStdout().contains("Usage: properties "));
         result = pulsarCluster.runAdminCommandOnAnyBroker(
@@ -63,7 +65,7 @@ public void testCreateSubscriptionCommand() throws Exception {
 
         int i = 0;
         for (BrokerContainer container : pulsarCluster.getBrokers()) {
-            ExecResult result = container.execCmd(
+            ContainerExecResult result = container.execCmd(
                 PulsarCluster.ADMIN_SCRIPT,
                 "persistent",
                 "create-subscription",
@@ -71,6 +73,7 @@ public void testCreateSubscriptionCommand() throws Exception {
                 "--subscription",
                 "" + subscriptionPrefix + i
             );
+            assertEquals(0, result.getExitCode());
             assertTrue(result.getStdout().isEmpty());
             assertTrue(result.getStderr().isEmpty());
             i++;
@@ -81,7 +84,7 @@ public void testCreateSubscriptionCommand() throws Exception {
     public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws 
Exception {
         String topicName = 
"persistent://public/default/test-topic-termination";
         BrokerContainer container = pulsarCluster.getAnyBroker();
-        ExecResult result = container.execCmd(
+        ContainerExecResult result = container.execCmd(
             PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
@@ -90,7 +93,8 @@ public void 
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
             "1",
             topicName);
 
-        Assert.assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
 
         // terminate the topic
         result = container.execCmd(
@@ -98,7 +102,8 @@ public void 
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
             "persistent",
             "terminate",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("Topic succesfully 
terminated at"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("Topic succesfully terminated 
at"));
 
         // try to produce should fail
         result = pulsarCluster.getAnyBroker().execCmd(
@@ -109,6 +114,7 @@ public void 
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
             "-n",
             "1",
             topicName);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("Topic was already 
terminated"));
     }
 
@@ -117,7 +123,7 @@ public void testSchemaCLI() throws Exception {
         BrokerContainer container = pulsarCluster.getAnyBroker();
         String topicName = "persistent://public/default/test-schema-cli";
 
-        ExecResult result = container.execCmd(
+        ContainerExecResult result = container.execCmd(
             PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
@@ -125,7 +131,8 @@ public void testSchemaCLI() throws Exception {
             "-n",
             "1",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
 
         result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -135,8 +142,9 @@ public void testSchemaCLI() throws Exception {
             "-f",
             "/pulsar/conf/schema_example.conf"
         );
-        Assert.assertTrue(result.getStdout().isEmpty());
-        Assert.assertTrue(result.getStderr().isEmpty());
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().isEmpty());
+        assertTrue(result.getStderr().isEmpty());
 
         // get schema
         result = container.execCmd(
@@ -144,7 +152,8 @@ public void testSchemaCLI() throws Exception {
             "schemas",
             "get",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("\"type\" : 
\"STRING\""));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("\"type\" : \"STRING\""));
 
         // delete the schema
         result = container.execCmd(
@@ -152,8 +161,9 @@ public void testSchemaCLI() throws Exception {
             "schemas",
             "delete",
             topicName);
-        Assert.assertTrue(result.getStdout().isEmpty());
-        Assert.assertTrue(result.getStderr().isEmpty());
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().isEmpty());
+        assertTrue(result.getStderr().isEmpty());
 
         // get schema again
         result = container.execCmd(
@@ -162,6 +172,7 @@ public void testSchemaCLI() throws Exception {
             "get",
             "persistent://public/default/test-schema-cli"
         );
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: HTTP 404 Not Found"));
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 6825e73520..05d6235de6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -23,12 +23,11 @@
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
 
 public class TestCompaction extends PulsarClusterTestBase {
 
@@ -54,12 +53,12 @@ public void testPublishCompactAndConsumeCLI(String 
serviceUrl) throws Exception
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content0".getBytes());
 
                 m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
 
             pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", 
"-t", topic);
@@ -67,8 +66,8 @@ public void testPublishCompactAndConsumeCLI(String 
serviceUrl) throws Exception
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
         }
     }
@@ -98,12 +97,12 @@ public void testPublishCompactAndConsumeRest(String 
serviceUrl) throws Exception
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content0".getBytes());
 
                 m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
             pulsarCluster.runAdminCommandOnAnyBroker("persistent",
                     "compact", topic);
@@ -114,8 +113,8 @@ public void testPublishCompactAndConsumeRest(String 
serviceUrl) throws Exception
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
         }
     }
@@ -126,7 +125,7 @@ private static void waitAndVerifyCompacted(PulsarClient 
client, String topic,
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                  .readCompacted(true).subscriptionName(sub).subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), expectedKey);
+                assertEquals(m.getKey(), expectedKey);
                 if (new String(m.getData()).equals(expectedValue)) {
                     break;
                 }
@@ -136,8 +135,8 @@ private static void waitAndVerifyCompacted(PulsarClient 
client, String topic,
         try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                 .readCompacted(true).subscriptionName(sub).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), expectedKey);
-            Assert.assertEquals(new String(m.getData()), expectedValue);
+            assertEquals(m.getKey(), expectedKey);
+            assertEquals(new String(m.getData()), expectedValue);
         }
     }
 
@@ -172,20 +171,24 @@ public void testPublishWithAutoCompaction(String 
serviceUrl) throws Exception {
         }
     }
 
-    private Container.ExecResult createTenantName(final String tenantName,
-                                                  final String 
allowedClusterName,
-                                                  final String adminRoleName) 
throws Exception {
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createTenantName(final String tenantName,
+                                                 final String 
allowedClusterName,
+                                                 final String adminRoleName) 
throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
             "tenants", "create", "--allowed-clusters", allowedClusterName,
             "--admin-roles", adminRoleName, tenantName);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 
-    private Container.ExecResult createNamespace(final String Ns) throws 
Exception {
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createNamespace(final String Ns) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
                 "namespaces",
                 "create",
                 "--clusters",
                 pulsarCluster.getClusterName(), Ns);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 32e9583b17..239954dc32 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -20,6 +20,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.command.LogContainerCmd;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.core.command.LogContainerResultCallback;
@@ -28,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.GenericContainer;
 
@@ -101,18 +103,11 @@ public void onNext(Frame item) {
         return sb.toString();
     }
 
-    public ExecResult execCmd(String... cmd) throws Exception {
-        String cmdString = StringUtils.join(cmd, " ");
-
-        log.info("DOCKER.exec({}:{}): Executing ...", 
containerName.substring(1), cmdString);
-
-        ExecResult result = execInContainer(cmd);
-
-        log.info("Docker.exec({}:{}): Done", containerName.substring(1), 
cmdString);
-        log.info("Docker.exec({}:{}): Stdout -\n{}", 
containerName.substring(1), cmdString, result.getStdout());
-        log.info("Docker.exec({}:{}): Stderr -\n{}", 
containerName.substring(1), cmdString, result.getStderr());
-
-        return result;
+    public ContainerExecResult execCmd(String... commands) throws Exception {
+        DockerClient client = this.getDockerClient();
+        String dockerId = this.getContainerId();
+        return DockerUtils.runCommand(
+            client, dockerId, true, commands);
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
new file mode 100644
index 0000000000..fe040ad063
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.docker;
+
+import lombok.Data;
+
+/**
+ * Represents the result of executing a command.
+ */
+@Data(staticConstructor = "of")
+public class ContainerExecResult {
+
+    private final int exitCode;
+    private final String stdout;
+    private final String stderr;
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d83661ca69..a541c092bd 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -50,7 +51,8 @@ public String checkUpload() throws Exception {
         String[] commands = {
             "sh", "-c", actualCommand
         };
-        ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult output = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
         return bkPkgPath;
     }
@@ -71,7 +73,8 @@ public void checkDownload() throws Exception {
             "sh", "-c", actualCommand
         };
         WorkerContainer container = pulsarCluster.getAnyWorker();
-        ExecResult output = container.execCmd(commands);
+        ContainerExecResult output = container.execCmd(commands);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
         String[] diffCommand = {
             "diff",
@@ -79,6 +82,7 @@ public void checkDownload() throws Exception {
             localPkgFile
         };
         output = container.execCmd(diffCommand);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().isEmpty());
         assertTrue(output.getStderr().isEmpty());
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6477323d99..8b7bafc566 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.functions.runtime;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import lombok.Cleanup;
@@ -28,6 +29,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
@@ -103,13 +105,14 @@ private static void submitExclamationFunction(Runtime 
runtime,
         String[] commands = {
             "sh", "-c", command
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"Created successfully\""));
     }
 
     private static void getFunctionInfoSuccess(String functionName) throws 
Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "get",
@@ -117,11 +120,12 @@ private static void getFunctionInfoSuccess(String 
functionName) throws Exception
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
     }
 
     private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "get",
@@ -129,11 +133,12 @@ private static void getFunctionInfoNotFound(String 
functionName) throws Exceptio
             "--namespace", "default",
             "--name", functionName
         );
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + 
functionName + " doesn't exist"));
     }
 
     private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "getstatus",
@@ -141,6 +146,7 @@ private static void getFunctionStatus(String functionName, 
int numMessages) thro
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"running\": true"));
         assertTrue(result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\""));
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\""));
@@ -172,7 +178,7 @@ private static void publishAndConsumeMessages(String 
inputTopic,
     }
 
     private static void deleteFunction(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "delete",
@@ -180,6 +186,7 @@ private static void deleteFunction(String functionName) 
throws Exception {
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("Deleted successfully"));
         assertTrue(result.getStderr().isEmpty());
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index d0aab47f36..71cad0501b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
@@ -32,11 +34,11 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -131,7 +133,8 @@ protected void submitSinkConnector(String tenant,
             "--inputs", inputTopicName
         };
         log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"Created successfully\""),
             result.getStdout());
@@ -146,8 +149,9 @@ protected void getSinkInfoSuccess(String tenant, String 
namespace, String sinkNa
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"builtin\": \"" + tester.sinkType + 
"\""),
             result.getStdout()
@@ -164,9 +168,9 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
             "--name", sinkName
         };
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get sink status : {}", result.getStdout());
-            if (result.getStdout().contains("\"running\": true")) {
+            if (0 == result.getExitCode() && 
result.getStdout().contains("\"running\": true")) {
                 return;
             }
             log.info("Backoff 1 second until the function is running");
@@ -211,9 +215,10 @@ protected void waitForProcessingMessages(String tenant,
         };
         Stopwatch stopwatch = Stopwatch.createStarted();
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get sink status : {}", result.getStdout());
-            if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+            if (0 == result.getExitCode()
+                && result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
                 return;
             }
             log.info("{} ms has elapsed but the sink hasn't process {} 
messages, backoff to wait for another 1 second",
@@ -231,7 +236,8 @@ protected void deleteSink(String tenant, String namespace, 
String sinkName) thro
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("Deleted successfully"),
             result.getStdout()
@@ -251,7 +257,8 @@ protected void getSinkInfoNotFound(String tenant, String 
namespace, String sinkN
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + sinkName 
+ " doesn't exist"));
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
index 78d8874d4f..8468976e23 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.io;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
@@ -34,11 +35,11 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -143,7 +144,8 @@ protected void submitSourceConnector(String tenant,
             "--destinationTopicName", outputTopicName
         };
         log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"Created successfully\""),
             result.getStdout());
@@ -158,8 +160,9 @@ protected void getSourceInfoSuccess(String tenant, String 
namespace, String sour
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get source info : {}", result.getStdout());
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"builtin\": \"" + tester.sourceType 
+ "\""),
             result.getStdout()
@@ -176,9 +179,9 @@ protected void getSourceStatus(String tenant, String 
namespace, String sourceNam
             "--name", sourceName
         };
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get source status : {}", result.getStdout());
-            if (result.getStdout().contains("\"running\": true")) {
+            if (0 == result.getExitCode() && 
result.getStdout().contains("\"running\": true")) {
                 return;
             }
             log.info("Backoff 1 second until the function is running");
@@ -209,9 +212,10 @@ protected void waitForProcessingMessages(String tenant,
         };
         Stopwatch stopwatch = Stopwatch.createStarted();
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get source status : {}", result.getStdout());
-            if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+            if (0 == result.getExitCode()
+                && result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
                 return;
             }
             log.info("{} ms has elapsed but the source hasn't process {} 
messages, backoff to wait for another 1 second",
@@ -229,7 +233,8 @@ protected void deleteSource(String tenant, String 
namespace, String sourceName)
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("Delete source successfully"),
             result.getStdout()
@@ -249,7 +254,8 @@ protected void getSourceInfoNotFound(String tenant, String 
namespace, String sou
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + 
sourceName + " doesn't exist"));
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
index 85a8d2db35..b22bd7476b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.smoke;
 
+import static org.testng.Assert.assertEquals;
+
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -25,9 +27,8 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -69,15 +70,17 @@ public void testPublishAndConsume(String serviceUrl) throws 
Exception {
         }
         for (int i = 0; i < 10; i++) {
             Message m = consumer.receive();
-            Assert.assertEquals("smoke-message" + i, new String(m.getData()));
+            assertEquals("smoke-message" + i, new String(m.getData()));
         }
     }
 
-    private Container.ExecResult createTenantName(String tenantName,
-                                                  String clusterName,
-                                                  String roleName) throws 
Exception {
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createTenantName(String tenantName,
+                                                 String clusterName,
+                                                 String roleName) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
             "tenants", "create", tenantName, "--allowed-clusters", clusterName,
             "--admin-roles", roleName);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bdd8123a0f..2f1653a9b4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -43,7 +43,7 @@
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.testcontainers.containers.Container.ExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 
@@ -315,15 +315,15 @@ public WorkerContainer getAnyWorker() {
         return brokerContainers.values();
     }
 
-    public ExecResult runAdminCommandOnAnyBroker(String...commands) throws 
Exception {
+    public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) 
throws Exception {
         return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
     }
 
-    public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands) 
throws Exception {
+    public ContainerExecResult 
runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception {
         return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, 
commands);
     }
 
-    private ExecResult runCommandOnAnyBrokerWithScript(String scriptType, 
String...commands) throws Exception {
+    private ContainerExecResult runCommandOnAnyBrokerWithScript(String 
scriptType, String...commands) throws Exception {
         BrokerContainer container = getAnyBroker();
         String[] cmds = new String[commands.length + 1];
         cmds[0] = scriptType;
@@ -339,13 +339,13 @@ public void startAllBrokers() {
         brokerContainers.values().forEach(BrokerContainer::start);
     }
 
-    public ExecResult createNamespace(String nsName) throws Exception {
+    public ContainerExecResult createNamespace(String nsName) throws Exception 
{
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
             "--clusters", clusterName);
     }
 
-    public ExecResult enableDeduplication(String nsName, boolean enabled) 
throws Exception {
+    public ContainerExecResult enableDeduplication(String nsName, boolean 
enabled) throws Exception {
         return runAdminCommandOnAnyBroker(
             "namespaces", "set-deduplication", "public/" + nsName,
             enabled ? "--enable" : "--disable");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 51fb8d8790..8684201bb8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.utils;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.async.ResultCallback;
 import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -25,6 +27,7 @@
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.ContainerNetwork;
 
+import com.github.dockerjava.api.model.StreamType;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -34,16 +37,15 @@
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,16 +164,26 @@ public static String getContainerIP(DockerClient docker, 
String containerId) {
         throw new IllegalArgumentException("Container " + containerId + " has 
no networks");
     }
 
-    public static String getContainerHostname(DockerClient docker, String 
containerId) {
-        return runCommand(docker, containerId, "hostname").trim();
+    public static ContainerExecResult runCommand(DockerClient docker, String 
containerId, String... cmd)
+            throws Exception {
+        return runCommand(docker, containerId, false, cmd);
     }
 
-    public static String runCommand(DockerClient docker, String containerId, 
String... cmd) {
+    public static ContainerExecResult runCommand(DockerClient docker,
+                                                 String containerId,
+                                                 boolean ignoreError,
+                                                 String... cmd)
+            throws Exception {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        String execid = docker.execCreateCmd(containerId).withCmd(cmd)
-            .withAttachStderr(true).withAttachStdout(true).exec().getId();
+        String execid = docker.execCreateCmd(containerId)
+            .withCmd(cmd)
+            .withAttachStderr(true)
+            .withAttachStdout(true)
+            .exec()
+            .getId();
         String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
-        StringBuffer output = new StringBuffer();
+        StringBuilder stdout = new StringBuilder();
+        StringBuilder stderr = new StringBuilder();
         docker.execStartCmd(execid).withDetach(false)
             .exec(new ResultCallback<Frame>() {
                 @Override
@@ -185,7 +197,11 @@ public void onStart(Closeable closeable) {
                 @Override
                 public void onNext(Frame object) {
                     LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, 
object);
-                    output.append(new String(object.getPayload()));
+                    if (StreamType.STDOUT == object.getStreamType()) {
+                        stdout.append(new String(object.getPayload(), UTF_8));
+                    } else if (StreamType.STDERR == object.getStreamType()) {
+                        stderr.append(new String(object.getPayload(), UTF_8));
+                    }
                 }
 
                 @Override
@@ -213,14 +229,22 @@ public void onComplete() {
         }
         int retCode = resp.getExitCode();
         if (retCode != 0) {
-            LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId, 
cmdString, retCode, output);
-            throw new RuntimeException(
-                    String.format("cmd(%s) failed on %s with exitcode %d",
-                                  cmdString, containerId, retCode));
+            if (!ignoreError) {
+                LOG.error("DOCKER.exec({}:{}): failed with {} 
:\nStdout:\n{}\n\nStderr:\n{}",
+                    containerId, cmdString, retCode, stdout.toString(), 
stderr.toString());
+                throw new Exception(String.format("cmd(%s) failed on %s with 
exitcode %d",
+                    cmdString, containerId, retCode));
+            } else {
+                LOG.error("DOCKER.exec({}:{}): failed with {}", containerId, 
cmdString, retCode);
+            }
         } else {
             LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, 
cmdString, retCode);
         }
-        return output.toString();
+        return ContainerExecResult.of(
+            retCode,
+            stdout.toString(),
+            stderr.toString()
+        );
     }
 
     public static Optional<String> getContainerCluster(DockerClient docker, 
String containerId) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to