sijie closed pull request #2061:  Move integration tests for function 
upload/download to apache
URL: https://github.com/apache/incubator-pulsar/pull/2061
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile
index 87ef64f17b..508b4f365e 100644
--- a/docker/pulsar/Dockerfile
+++ b/docker/pulsar/Dockerfile
@@ -20,7 +20,7 @@
 FROM openjdk:8-jdk
 
 # Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
+RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo 
python-yaml
 
 ARG PULSAR_TARBALL
 
@@ -28,6 +28,7 @@ ADD ${PULSAR_TARBALL} /
 RUN mv /apache-pulsar-* /pulsar
 
 COPY scripts/apply-config-from-env.py /pulsar/bin
+COPY scripts/gen-yml-from-env.py /pulsar/bin
 COPY scripts/generate-zookeeper-config.sh /pulsar/bin
 COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
 COPY scripts/watch-znode.py /pulsar/bin
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py 
b/docker/pulsar/scripts/gen-yml-from-env.py
new file mode 100755
index 0000000000..28599a54b9
--- /dev/null
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+## Generate a yml from env.py
+##
+## ./gen-yml-from-env.py <template yml file> [<template yml file>]
+##
+
+import os, sys
+import yaml
+
+if len(sys.argv) < 2:
+    print 'Usage: %s' % (sys.argv[0])
+    sys.exit(1)
+
+conf_files = sys.argv[1:]
+
+for conf_filename in conf_files:
+    conf = yaml.load(open(conf_filename))
+
+    # update the config
+    modified = False
+    for k in sorted(os.environ.keys()):
+        key_parts = k.split('_')
+        v = os.environ[k]
+
+        i = 0
+        conf_to_modify = conf
+        while i < len(key_parts):
+            key_part = key_parts[i]
+            if not key_part in conf_to_modify:
+                break
+
+            if i == (len(key_parts) - 1):
+                if key_part == 'workerPort':
+                    conf_to_modify[key_part] = int(v)
+                else:
+                    conf_to_modify[key_part] = v
+
+                modified = True
+            else:
+                conf_to_modify = conf_to_modify[key_part]
+            i += 1
+    # Store back the updated config in the same file
+    f = open(conf_filename , 'w')
+    yaml.dump(conf, f, default_flow_style=False)
+    f.close()
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index deb0676fd0..e638ba0851 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -24,12 +24,12 @@ RUN apt-get update && apt-get install -y supervisor
 RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p 
/pulsar/ssl
 
 COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf \
+COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
      conf/proxy.conf /etc/supervisord/conf.d/
 
 COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
      ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
 
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-proxy.sh 
/pulsar/bin/
+     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh /pulsar/bin/
 
diff --git 
a/tests/docker-images/latest-version-image/conf/functions_worker.conf 
b/tests/docker-images/latest-version-image/conf/functions_worker.conf
new file mode 100644
index 0000000000..637be95e1d
--- /dev/null
+++ b/tests/docker-images/latest-version-image/conf/functions_worker.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[program:functions-worker]
+autostart=false
+redirect_stderr=true
+stdout_logfile=/var/log/pulsar/functions_worker.log
+directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar functions-worker
+
diff --git 
a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh 
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
new file mode 100755
index 0000000000..3fadf960ee
--- /dev/null
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+bin/apply-config-from-env.py conf/client.conf && \
+    bin/gen-yml-from-env.py conf/functions_worker.yml && \
+    bin/apply-config-from-env.py conf/pulsar_env.sh
+
+if [ -z "$NO_AUTOSTART" ]; then
+    sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/functions_worker.conf
+fi
+
+bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
+exec /usr/bin/supervisord -c /etc/supervisord.conf
+
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
index eb0ae840d1..0a0b1590b8 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
@@ -68,10 +68,9 @@ public String getContainerName() {
     @Override
     protected void configure() {
         if (httpPort > 0) {
-            addExposedPorts(
-                servicePort, httpPort
-            );
-        } else if (servicePort > 0) {
+            addExposedPorts(httpPort);
+        }
+        if (servicePort > 0) {
             addExposedPort(servicePort);
         }
     }
@@ -80,7 +79,7 @@ protected void configure() {
     public void start() {
         if (httpPort > 0 || servicePort > 0) {
             this.waitStrategy = new HostPortWaitStrategy()
-                .withStartupTimeout(Duration.of(60, SECONDS));
+                .withStartupTimeout(Duration.of(300, SECONDS));
         }
         this.withCreateContainerCmdModifier(createContainerCmd -> {
             createContainerCmd.withHostName(hostname);
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
new file mode 100644
index 0000000000..df8a28e87d
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs functions worker.
+ */
+public class WorkerContainer extends PulsarContainer<WorkerContainer> {
+
+    public WorkerContainer(String clusterName, String hostname) {
+        super(
+            clusterName, hostname, hostname, "bin/run-functions-worker.sh", 
-1, BROKER_HTTP_PORT);
+    }
+}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 9e2860f233..a721915d62 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -23,6 +23,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@
 import org.apache.pulsar.tests.containers.CSContainer;
 import org.apache.pulsar.tests.containers.ProxyContainer;
 import org.apache.pulsar.tests.containers.PulsarContainer;
+import org.apache.pulsar.tests.containers.WorkerContainer;
 import org.apache.pulsar.tests.containers.ZKContainer;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.Network;
@@ -45,7 +47,8 @@
 @Slf4j
 public class PulsarCluster {
 
-    protected static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
+    public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
+    public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
 
     /**
      * Pulsar Cluster Spec.
@@ -65,6 +68,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
     private final CSContainer csContainer;
     private final Map<String, BKContainer> bookieContainers;
     private final Map<String, BrokerContainer> brokerContainers;
+    private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
 
     private PulsarCluster(PulsarClusterSpec spec) {
@@ -84,9 +88,11 @@ private PulsarCluster(PulsarClusterSpec spec) {
             .withNetworkAliases(CSContainer.NAME);
         this.bookieContainers = Maps.newTreeMap();
         this.brokerContainers = Maps.newTreeMap();
+        this.workerContainers = Maps.newTreeMap();
         this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
             .withNetwork(network)
             .withNetworkAliases("pulsar-proxy")
+            .withEnv("zkServers", ZKContainer.NAME)
             .withEnv("zookeeperServers", ZKContainer.NAME)
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
             .withEnv("clusterName", clusterName);
@@ -130,6 +136,7 @@ public void start() throws Exception {
             runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
                 .withNetwork(network)
                 .withNetworkAliases(name)
+                .withEnv("zkServers", ZKContainer.NAME)
                 .withEnv("zookeeperServers", ZKContainer.NAME)
                 .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
                 .withEnv("clusterName", clusterName)
@@ -164,6 +171,7 @@ public void start() throws Exception {
 
     public void stop() {
         proxyContainer.stop();
+        workerContainers.values().forEach(WorkerContainer::stop);
         brokerContainers.values().forEach(BrokerContainer::stop);
         bookieContainers.values().forEach(BKContainer::stop);
         csContainer.stop();
@@ -175,12 +183,48 @@ public void stop() {
         }
     }
 
+    public void startFunctionWorkers(int numFunctionWorkers) {
+        String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
+        String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
+        workerContainers.putAll(runNumContainers(
+            "functions-worker",
+            numFunctionWorkers,
+            (name) -> new WorkerContainer(clusterName, name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                // worker settings
+                .withEnv("workerId", name)
+                .withEnv("workerHostname", name)
+                .withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+                .withEnv("pulsarFunctionsCluster", clusterName)
+                .withEnv("pulsarServiceUrl", serviceUrl)
+                .withEnv("pulsarWebServiceUrl", httpServiceUrl)
+                .withEnv("clusterName", clusterName)
+                // script
+                .withEnv("zookeeperServers", ZKContainer.NAME)
+                // bookkeeper tools
+                .withEnv("zkServers", ZKContainer.NAME)
+        ));
+    }
+
     public BrokerContainer getAnyBroker() {
-        List<BrokerContainer> brokerList = Lists.newArrayList();
-        brokerList.addAll(brokerContainers.values());
-        Collections.shuffle(brokerList);
-        checkArgument(!brokerList.isEmpty(), "No broker is alive");
-        return brokerList.get(0);
+        return getAnyContainer(brokerContainers, "broker");
+    }
+
+    public WorkerContainer getAnyWorker() {
+        return getAnyContainer(workerContainers, "functions-worker");
+    }
+
+    private <T> T getAnyContainer(Map<String, T> containers, String 
serviceName) {
+        List<T> containerList = Lists.newArrayList();
+        containerList.addAll(containers.values());
+        Collections.shuffle(containerList);
+        checkArgument(!containerList.isEmpty(), "No " + serviceName + " is 
alive");
+        return containerList.get(0);
+    }
+
+    public Collection<BrokerContainer> getBrokers() {
+        return brokerContainers.values();
     }
 
     public ExecResult runAdminCommandOnAnyBroker(String...commands) throws 
Exception {
diff --git a/tests/integration/cli/pom.xml b/tests/integration/cli/pom.xml
index 9d960ac3d1..5a05b4ef12 100644
--- a/tests/integration/cli/pom.xml
+++ b/tests/integration/cli/pom.xml
@@ -27,9 +27,8 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar.tests</groupId>
-    <artifactId>integration-tests-base</artifactId>
+    <artifactId>integration</artifactId>
     <version>2.2.0-incubating-SNAPSHOT</version>
-    <relativePath>../../integration-tests-base</relativePath>
   </parent>
 
   <groupId>org.apache.pulsar.tests.integration</groupId>
@@ -42,6 +41,13 @@
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar.tests</groupId>
+      <artifactId>integration-tests-topologies</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
 
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
index e4a67c1db3..fdb19cd980 100644
--- 
a/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
+++ 
b/tests/integration/cli/src/test/java/org/apache/pulsar/tests/integration/TestCLI.java
@@ -18,47 +18,41 @@
  */
 package org.apache.pulsar.tests.integration;
 
-import static org.testng.Assert.fail;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
-import com.github.dockerjava.api.DockerClient;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-public class TestCLI extends Arquillian {
-    private static String clusterName = "test";
-
-    @ArquillianResource
-    DockerClient docker;
-
-    @BeforeMethod
-    public void waitServicesUp() throws Exception {
-        Assert.assertTrue(PulsarClusterUtils.waitZooKeeperUp(docker, 
clusterName, 30, TimeUnit.SECONDS));
-        Assert.assertTrue(PulsarClusterUtils.waitAllBrokersUp(docker, 
clusterName));
-    }
+/**
+ * Test Pulsar CLI.
+ */
+public class TestCLI extends PulsarClusterTestBase {
 
     @Test
     public void testDeprecatedCommands() throws Exception {
-        String broker = PulsarClusterUtils.brokerSet(docker, 
clusterName).stream().findAny().get();
-
-        Assert.assertFalse(DockerUtils.runCommand(docker, broker, 
"/pulsar/bin/pulsar-admin", "--help")
-                           .contains("Usage: properties "));
-        Assert.assertTrue(DockerUtils.runCommand(docker, broker,
-                                                 "/pulsar/bin/pulsar-admin", 
"properties",
-                                                 "create", 
"compaction-test-cli", "--allowed-clusters", clusterName,
-                                                 "--admin-roles", 
"admin").contains("deprecated"));
-        Assert.assertTrue(DockerUtils.runCommand(docker, broker, 
"/pulsar/bin/pulsar-admin", "properties", "list")
-                          .contains("compaction-test-cli"));
-        Assert.assertTrue(DockerUtils.runCommand(docker, broker, 
"/pulsar/bin/pulsar-admin", "tenants", "list")
-                          .contains("compaction-test-cli"));
-
+        String tenantName = "test-deprecated-commands";
+
+        ExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help");
+        assertFalse(result.getStdout().isEmpty());
+        assertFalse(result.getStdout().contains("Usage: properties "));
+        result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "properties", "create", tenantName,
+            "--allowed-clusters", pulsarCluster.getClusterName(),
+            "--admin-roles", "admin"
+        );
+        assertTrue(result.getStderr().contains("deprecated"));
+
+        result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "properties", "list");
+        assertTrue(result.getStdout().contains(tenantName));
+        result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "tenants", "list");
+        assertTrue(result.getStdout().contains(tenantName));
     }
 
     @Test
@@ -68,119 +62,106 @@ public void testCreateSubscriptionCommand() throws 
Exception {
         String subscriptionPrefix = "subscription-";
 
         int i = 0;
-        for (String b : PulsarClusterUtils.brokerSet(docker, clusterName)) {
-            Assert.assertTrue(
-                DockerUtils.runCommand(docker, b,
-                    "/pulsar/bin/pulsar-admin",
-                    "persistent",
-                    "create-subscription",
-                    "persistent://public/default/" + topic,
-                    "--subscription",
-                    subscriptionPrefix + i
-                ).isEmpty()
+        for (BrokerContainer container : pulsarCluster.getBrokers()) {
+            ExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "persistent",
+                "create-subscription",
+                "persistent://public/default/" + topic,
+                "--subscription",
+                "" + subscriptionPrefix + i
             );
+            assertTrue(result.getStdout().isEmpty());
+            assertTrue(result.getStderr().isEmpty());
             i++;
         }
     }
 
     @Test
     public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws 
Exception {
-        String broker = PulsarClusterUtils.brokerSet(docker, 
clusterName).stream().findAny().get();
-
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-client",
+        String topicName = 
"persistent://public/default/test-topic-termination";
+        BrokerContainer container = pulsarCluster.getAnyBroker();
+        ExecResult result = container.execCmd(
+            PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
             "\"test topic termination\"",
             "-n",
             "1",
-            "persistent://public/default/test-topic-termination"
-        ).contains("1 messages successfully produced"));
+            topicName);
+
+        Assert.assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
 
         // terminate the topic
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-admin",
+        result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
             "persistent",
             "terminate",
-            "persistent://public/default/test-topic-termination"
-        ).contains("Topic succesfully terminated at"));
+            topicName);
+        Assert.assertTrue(result.getStdout().contains("Topic succesfully 
terminated at"));
 
         // try to produce should fail
-
-        try {
-            DockerUtils.runCommand(
-                docker, broker,
-                "/pulsar/bin/pulsar-client",
-                "produce",
-                "-m",
-                "\"test topic termination\"",
-                "-n",
-                "1",
-                "persistent://public/default/test-topic-termination"
-            );
-            fail("Should fail to produce messages to a terminated topic");
-        } catch (RuntimeException re) {
-            // expected
-        }
+        result = pulsarCluster.getAnyBroker().execCmd(
+            PulsarCluster.CLIENT_SCRIPT,
+            "produce",
+            "-m",
+            "\"test topic termination\"",
+            "-n",
+            "1",
+            topicName);
+        assertTrue(result.getStdout().contains("Topic was already 
terminated"));
     }
 
     @Test
     public void testSchemaCLI() throws Exception {
-        String broker = PulsarClusterUtils.brokerSet(docker, 
clusterName).stream().findAny().get();
+        BrokerContainer container = pulsarCluster.getAnyBroker();
+        String topicName = "persistent://public/default/test-schema-cli";
 
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-client",
+        ExecResult result = container.execCmd(
+            PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
             "\"test topic schema\"",
             "-n",
             "1",
-            "persistent://public/default/test-schema-cli"
-        ).contains("1 messages successfully produced"));
+            topicName);
+        Assert.assertTrue(result.getStdout().contains("1 messages successfully 
produced"));
 
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-admin",
+        result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
             "schemas",
             "upload",
-            "persistent://public/default/test-schema-cli",
+            topicName,
             "-f",
             "/pulsar/conf/schema_example.conf"
-        ).isEmpty());
+        );
+        Assert.assertTrue(result.getStdout().isEmpty());
+        Assert.assertTrue(result.getStderr().isEmpty());
 
         // get schema
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-admin",
+        result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
             "schemas",
             "get",
-            "persistent://public/default/test-schema-cli"
-        ).contains("\"type\" : \"STRING\""));
+            topicName);
+        Assert.assertTrue(result.getStdout().contains("\"type\" : 
\"STRING\""));
 
         // delete the schema
-        Assert.assertTrue(DockerUtils.runCommand(
-            docker, broker,
-            "/pulsar/bin/pulsar-admin",
+        result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
             "schemas",
             "delete",
-            "persistent://public/default/test-schema-cli"
-        ).isEmpty());
+            topicName);
+        Assert.assertTrue(result.getStdout().isEmpty());
+        Assert.assertTrue(result.getStderr().isEmpty());
 
         // get schema again
-        try {
-            DockerUtils.runCommand(
-                docker, broker,
-                "/pulsar/bin/pulsar-admin",
-                "schemas",
-                "get",
-                "persistent://public/default/test-schema-cli"
-            );
-            fail("Should fail to get schema if the schema is deleted");
-        } catch (RuntimeException re) {
-            // expected
-        }
+        result = container.execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "schemas",
+            "get",
+            "persistent://public/default/test-schema-cli"
+        );
+        assertTrue(result.getStderr().contains("Reason: HTTP 404 Not Found"));
     }
 }
diff --git a/tests/integration/cli/src/test/resources/arquillian.xml 
b/tests/integration/cli/src/test/resources/arquillian.xml
deleted file mode 100644
index 821d6e6e09..0000000000
--- a/tests/integration/cli/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-            xmlns="http://jboss.org/schema/arquillian";
-            xsi:schemaLocation="http://jboss.org/schema/arquillian
-                                
http://jboss.org/schema/arquillian/arquillian_1_0.xsd";>
-
-  <extension qualifier="docker">
-    <property name="definitionFormat">CUBE</property>
-    <property 
name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker.yaml</property>
-  </extension>
-
-</arquillian>
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
new file mode 100644
index 0000000000..07d86f7f0c
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.io.Files;
+import java.io.File;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.WorkerContainer;
+import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+
+    @Test
+    public String checkUpload() throws Exception {
+        String bkPkgPath = String.format("%s/%s/%s",
+            "tenant-" + randomName(8),
+            "ns-" + randomName(8),
+            "fn-" + randomName(8));
+
+        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createUploader(
+            PulsarCluster.ADMIN_SCRIPT,
+            bkPkgPath);
+        String actualCommand = generator.generateCommand();
+
+        log.info(actualCommand);
+
+        String[] commands = {
+            "sh", "-c", actualCommand
+        };
+        ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
+        return bkPkgPath;
+    }
+
+    @Test
+    public void checkDownload() throws Exception {
+        String bkPkgPath = checkUpload();
+        String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+
+        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createDownloader(
+                localPkgFile,
+                bkPkgPath);
+        String actualCommand = generator.generateCommand();
+
+        log.info(actualCommand);
+
+        String[] commands = {
+            "sh", "-c", actualCommand
+        };
+        WorkerContainer container = pulsarCluster.getAnyWorker();
+        ExecResult output = container.execCmd(commands);
+        assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
+        String[] diffCommand = {
+            "diff",
+            PulsarCluster.ADMIN_SCRIPT,
+            localPkgFile
+        };
+        output = container.execCmd(diffCommand);
+        assertTrue(output.getStdout().isEmpty());
+        assertTrue(output.getStderr().isEmpty());
+    }
+
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
new file mode 100644
index 0000000000..8e692ad7b0
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * A cluster to run pulsar functions for testing functions related features.
+ */
+public class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        PulsarClusterTestBase.setupCluster();
+
+        pulsarCluster.startFunctionWorkers(1);
+    }
+
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
new file mode 100644
index 0000000000..adc0228a91
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class UploadDownloadCommandGenerator {
+    public enum MODE {
+        UPLOAD,
+        DOWNLOAD,
+    };
+    private MODE mode;
+    private String bkPath;
+    private String localPath;
+    private String brokerUrl;
+
+    public static UploadDownloadCommandGenerator createUploader(String 
localPath, String bkPath) {
+        return new UploadDownloadCommandGenerator(MODE.UPLOAD, localPath, 
bkPath);
+    }
+
+    public static UploadDownloadCommandGenerator createDownloader(String 
localPath, String bkPath) {
+        return new UploadDownloadCommandGenerator(MODE.DOWNLOAD, localPath, 
bkPath);
+    }
+
+    public UploadDownloadCommandGenerator(MODE mode, String localPath, String 
bkPath) {
+        this.mode = mode;
+        this.localPath = localPath;
+        this.bkPath = bkPath;
+    }
+
+    public void createBrokerUrl(String host, int port) {
+        brokerUrl = "pulsar://" + host + ":" + port;
+    }
+
+    public String generateCommand() {
+        StringBuilder commandBuilder = new 
StringBuilder().append(PulsarCluster.ADMIN_SCRIPT).append(" functions ");
+        if (mode == MODE.UPLOAD) {
+            commandBuilder.append(" upload ");
+        } else {
+            commandBuilder.append(" download ");
+        }
+        commandBuilder.append(" --path ");
+        commandBuilder.append(bkPath);
+        if (mode == MODE.UPLOAD) {
+            commandBuilder.append(" --sourceFile ");
+        } else {
+            commandBuilder.append(" --destinationFile ");
+        }
+        commandBuilder.append(localPath);
+        return commandBuilder.toString();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to