This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 124dfaf   Move integration tests for function upload/download to 
apache (#2061)
124dfaf is described below

commit 124dfafeb3ab92f55ce5fb2de4220f3ea57a803c
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 2 00:17:40 2018 -0700

     Move integration tests for function upload/download to apache (#2061)
    
    This change add `function-worker` into the `PulsarClusterTestBase` to 
create a test base for functions related tests `PulsarFunctionsTestBase`.
    
    Move the existing upload/download tests to use it.
---
 docker/pulsar/Dockerfile                           |  3 +-
 docker/pulsar/scripts/gen-yml-from-env.py          | 65 +++++++++++++++++
 .../docker-images/latest-version-image/Dockerfile  |  4 +-
 .../conf/functions_worker.conf                     | 26 ++-----
 .../scripts/run-functions-worker.sh                | 26 +++----
 .../pulsar/tests/containers/PulsarContainer.java   |  9 +--
 .../pulsar/tests/containers/WorkerContainer.java   | 30 ++++++++
 .../pulsar/tests/topologies/PulsarCluster.java     | 52 +++++++++++--
 .../integration/functions/PulsarFunctionsTest.java | 85 ++++++++++++++++++++++
 .../functions/PulsarFunctionsTestBase.java         | 36 +++++++++
 .../utils/UploadDownloadCommandGenerator.java      | 74 +++++++++++++++++++
 11 files changed, 361 insertions(+), 49 deletions(-)

diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile
index 87ef64f..508b4f3 100644
--- a/docker/pulsar/Dockerfile
+++ b/docker/pulsar/Dockerfile
@@ -20,7 +20,7 @@
 FROM openjdk:8-jdk
 
 # Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
+RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo 
python-yaml
 
 ARG PULSAR_TARBALL
 
@@ -28,6 +28,7 @@ ADD ${PULSAR_TARBALL} /
 RUN mv /apache-pulsar-* /pulsar
 
 COPY scripts/apply-config-from-env.py /pulsar/bin
+COPY scripts/gen-yml-from-env.py /pulsar/bin
 COPY scripts/generate-zookeeper-config.sh /pulsar/bin
 COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
 COPY scripts/watch-znode.py /pulsar/bin
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py 
b/docker/pulsar/scripts/gen-yml-from-env.py
new file mode 100755
index 0000000..28599a5
--- /dev/null
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+## Generate a yml from env.py
+##
+## ./gen-yml-from-env.py <template yml file> [<template yml file>]
+##
+
+import os, sys
+import yaml
+
+if len(sys.argv) < 2:
+    print 'Usage: %s' % (sys.argv[0])
+    sys.exit(1)
+
+conf_files = sys.argv[1:]
+
+for conf_filename in conf_files:
+    conf = yaml.load(open(conf_filename))
+
+    # update the config
+    modified = False
+    for k in sorted(os.environ.keys()):
+        key_parts = k.split('_')
+        v = os.environ[k]
+
+        i = 0
+        conf_to_modify = conf
+        while i < len(key_parts):
+            key_part = key_parts[i]
+            if not key_part in conf_to_modify:
+                break
+
+            if i == (len(key_parts) - 1):
+                if key_part == 'workerPort':
+                    conf_to_modify[key_part] = int(v)
+                else:
+                    conf_to_modify[key_part] = v
+
+                modified = True
+            else:
+                conf_to_modify = conf_to_modify[key_part]
+            i += 1
+    # Store back the updated config in the same file
+    f = open(conf_filename , 'w')
+    yaml.dump(conf, f, default_flow_style=False)
+    f.close()
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index deb0676..e638ba0 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -24,12 +24,12 @@ RUN apt-get update && apt-get install -y supervisor
 RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p 
/pulsar/ssl
 
 COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf \
+COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
      conf/proxy.conf /etc/supervisord/conf.d/
 
 COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
      ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
 
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-proxy.sh 
/pulsar/bin/
+     scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh /pulsar/bin/
 
diff --git a/docker/pulsar/Dockerfile 
b/tests/docker-images/latest-version-image/conf/functions_worker.conf
similarity index 62%
copy from docker/pulsar/Dockerfile
copy to tests/docker-images/latest-version-image/conf/functions_worker.conf
index 87ef64f..637be95 100644
--- a/docker/pulsar/Dockerfile
+++ b/tests/docker-images/latest-version-image/conf/functions_worker.conf
@@ -17,23 +17,11 @@
 # under the License.
 #
 
-FROM openjdk:8-jdk
+[program:functions-worker]
+autostart=false
+redirect_stderr=true
+stdout_logfile=/var/log/pulsar/functions_worker.log
+directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar functions-worker
 
-# Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
-
-ARG PULSAR_TARBALL
-
-ADD ${PULSAR_TARBALL} /
-RUN mv /apache-pulsar-* /pulsar
-
-COPY scripts/apply-config-from-env.py /pulsar/bin
-COPY scripts/generate-zookeeper-config.sh /pulsar/bin
-COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
-COPY scripts/watch-znode.py /pulsar/bin
-
-WORKDIR /pulsar
-
-VOLUME  ["/pulsar/conf", "/pulsar/data"]
-
-ENV PULSAR_ROOT_LOGGER=INFO,CONSOLE
diff --git a/docker/pulsar/Dockerfile 
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
old mode 100644
new mode 100755
similarity index 62%
copy from docker/pulsar/Dockerfile
copy to tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
index 87ef64f..3fadf96
--- a/docker/pulsar/Dockerfile
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,23 +18,14 @@
 # under the License.
 #
 
-FROM openjdk:8-jdk
+bin/apply-config-from-env.py conf/client.conf && \
+    bin/gen-yml-from-env.py conf/functions_worker.yml && \
+    bin/apply-config-from-env.py conf/pulsar_env.sh
 
-# Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
+if [ -z "$NO_AUTOSTART" ]; then
+    sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/functions_worker.conf
+fi
 
-ARG PULSAR_TARBALL
+bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
+exec /usr/bin/supervisord -c /etc/supervisord.conf
 
-ADD ${PULSAR_TARBALL} /
-RUN mv /apache-pulsar-* /pulsar
-
-COPY scripts/apply-config-from-env.py /pulsar/bin
-COPY scripts/generate-zookeeper-config.sh /pulsar/bin
-COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
-COPY scripts/watch-znode.py /pulsar/bin
-
-WORKDIR /pulsar
-
-VOLUME  ["/pulsar/conf", "/pulsar/data"]
-
-ENV PULSAR_ROOT_LOGGER=INFO,CONSOLE
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
index eb0ae84..0a0b159 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
@@ -68,10 +68,9 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     @Override
     protected void configure() {
         if (httpPort > 0) {
-            addExposedPorts(
-                servicePort, httpPort
-            );
-        } else if (servicePort > 0) {
+            addExposedPorts(httpPort);
+        }
+        if (servicePort > 0) {
             addExposedPort(servicePort);
         }
     }
@@ -80,7 +79,7 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     public void start() {
         if (httpPort > 0 || servicePort > 0) {
             this.waitStrategy = new HostPortWaitStrategy()
-                .withStartupTimeout(Duration.of(60, SECONDS));
+                .withStartupTimeout(Duration.of(300, SECONDS));
         }
         this.withCreateContainerCmdModifier(createContainerCmd -> {
             createContainerCmd.withHostName(hostname);
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
new file mode 100644
index 0000000..df8a28e
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs functions worker.
+ */
+public class WorkerContainer extends PulsarContainer<WorkerContainer> {
+
+    public WorkerContainer(String clusterName, String hostname) {
+        super(
+            clusterName, hostname, hostname, "bin/run-functions-worker.sh", 
-1, BROKER_HTTP_PORT);
+    }
+}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 167908e..501606e 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.tests.containers.BrokerContainer;
 import org.apache.pulsar.tests.containers.CSContainer;
 import org.apache.pulsar.tests.containers.ProxyContainer;
 import org.apache.pulsar.tests.containers.PulsarContainer;
+import org.apache.pulsar.tests.containers.WorkerContainer;
 import org.apache.pulsar.tests.containers.ZKContainer;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.Network;
@@ -67,6 +68,7 @@ public class PulsarCluster {
     private final CSContainer csContainer;
     private final Map<String, BKContainer> bookieContainers;
     private final Map<String, BrokerContainer> brokerContainers;
+    private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
 
     private PulsarCluster(PulsarClusterSpec spec) {
@@ -86,9 +88,11 @@ public class PulsarCluster {
             .withNetworkAliases(CSContainer.NAME);
         this.bookieContainers = Maps.newTreeMap();
         this.brokerContainers = Maps.newTreeMap();
+        this.workerContainers = Maps.newTreeMap();
         this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
             .withNetwork(network)
             .withNetworkAliases("pulsar-proxy")
+            .withEnv("zkServers", ZKContainer.NAME)
             .withEnv("zookeeperServers", ZKContainer.NAME)
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
             .withEnv("clusterName", clusterName);
@@ -132,6 +136,7 @@ public class PulsarCluster {
             runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
                 .withNetwork(network)
                 .withNetworkAliases(name)
+                .withEnv("zkServers", ZKContainer.NAME)
                 .withEnv("zookeeperServers", ZKContainer.NAME)
                 .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
                 .withEnv("clusterName", clusterName)
@@ -166,6 +171,7 @@ public class PulsarCluster {
 
     public void stop() {
         proxyContainer.stop();
+        workerContainers.values().forEach(WorkerContainer::stop);
         brokerContainers.values().forEach(BrokerContainer::stop);
         bookieContainers.values().forEach(BKContainer::stop);
         csContainer.stop();
@@ -177,12 +183,48 @@ public class PulsarCluster {
         }
     }
 
+    public void startFunctionWorkers(int numFunctionWorkers) {
+        String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
+        String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
+        workerContainers.putAll(runNumContainers(
+            "functions-worker",
+            numFunctionWorkers,
+            (name) -> new WorkerContainer(clusterName, name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                // worker settings
+                .withEnv("workerId", name)
+                .withEnv("workerHostname", name)
+                .withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+                .withEnv("pulsarFunctionsCluster", clusterName)
+                .withEnv("pulsarServiceUrl", serviceUrl)
+                .withEnv("pulsarWebServiceUrl", httpServiceUrl)
+                .withEnv("clusterName", clusterName)
+                // script
+                .withEnv("zookeeperServers", ZKContainer.NAME)
+                // bookkeeper tools
+                .withEnv("zkServers", ZKContainer.NAME)
+        ));
+    }
+
     public BrokerContainer getAnyBroker() {
-        List<BrokerContainer> brokerList = Lists.newArrayList();
-        brokerList.addAll(brokerContainers.values());
-        Collections.shuffle(brokerList);
-        checkArgument(!brokerList.isEmpty(), "No broker is alive");
-        return brokerList.get(0);
+        return getAnyContainer(brokerContainers, "broker");
+    }
+
+    public WorkerContainer getAnyWorker() {
+        return getAnyContainer(workerContainers, "functions-worker");
+    }
+
+    private <T> T getAnyContainer(Map<String, T> containers, String 
serviceName) {
+        List<T> containerList = Lists.newArrayList();
+        containerList.addAll(containers.values());
+        Collections.shuffle(containerList);
+        checkArgument(!containerList.isEmpty(), "No " + serviceName + " is 
alive");
+        return containerList.get(0);
+    }
+
+    public Collection<BrokerContainer> getBrokers() {
+        return brokerContainers.values();
     }
 
     public Collection<BrokerContainer> getBrokers() {
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
new file mode 100644
index 0000000..07d86f7
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.io.Files;
+import java.io.File;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.WorkerContainer;
+import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+
+    @Test
+    public String checkUpload() throws Exception {
+        String bkPkgPath = String.format("%s/%s/%s",
+            "tenant-" + randomName(8),
+            "ns-" + randomName(8),
+            "fn-" + randomName(8));
+
+        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createUploader(
+            PulsarCluster.ADMIN_SCRIPT,
+            bkPkgPath);
+        String actualCommand = generator.generateCommand();
+
+        log.info(actualCommand);
+
+        String[] commands = {
+            "sh", "-c", actualCommand
+        };
+        ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
+        return bkPkgPath;
+    }
+
+    @Test
+    public void checkDownload() throws Exception {
+        String bkPkgPath = checkUpload();
+        String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+
+        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createDownloader(
+                localPkgFile,
+                bkPkgPath);
+        String actualCommand = generator.generateCommand();
+
+        log.info(actualCommand);
+
+        String[] commands = {
+            "sh", "-c", actualCommand
+        };
+        WorkerContainer container = pulsarCluster.getAnyWorker();
+        ExecResult output = container.execCmd(commands);
+        assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
+        String[] diffCommand = {
+            "diff",
+            PulsarCluster.ADMIN_SCRIPT,
+            localPkgFile
+        };
+        output = container.execCmd(diffCommand);
+        assertTrue(output.getStdout().isEmpty());
+        assertTrue(output.getStderr().isEmpty());
+    }
+
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
new file mode 100644
index 0000000..8e692ad
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * A cluster to run pulsar functions for testing functions related features.
+ */
+public class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        PulsarClusterTestBase.setupCluster();
+
+        pulsarCluster.startFunctionWorkers(1);
+    }
+
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
new file mode 100644
index 0000000..adc0228
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class UploadDownloadCommandGenerator {
+    public enum MODE {
+        UPLOAD,
+        DOWNLOAD,
+    };
+    private MODE mode;
+    private String bkPath;
+    private String localPath;
+    private String brokerUrl;
+
+    public static UploadDownloadCommandGenerator createUploader(String 
localPath, String bkPath) {
+        return new UploadDownloadCommandGenerator(MODE.UPLOAD, localPath, 
bkPath);
+    }
+
+    public static UploadDownloadCommandGenerator createDownloader(String 
localPath, String bkPath) {
+        return new UploadDownloadCommandGenerator(MODE.DOWNLOAD, localPath, 
bkPath);
+    }
+
+    public UploadDownloadCommandGenerator(MODE mode, String localPath, String 
bkPath) {
+        this.mode = mode;
+        this.localPath = localPath;
+        this.bkPath = bkPath;
+    }
+
+    public void createBrokerUrl(String host, int port) {
+        brokerUrl = "pulsar://" + host + ":" + port;
+    }
+
+    public String generateCommand() {
+        StringBuilder commandBuilder = new 
StringBuilder().append(PulsarCluster.ADMIN_SCRIPT).append(" functions ");
+        if (mode == MODE.UPLOAD) {
+            commandBuilder.append(" upload ");
+        } else {
+            commandBuilder.append(" download ");
+        }
+        commandBuilder.append(" --path ");
+        commandBuilder.append(bkPath);
+        if (mode == MODE.UPLOAD) {
+            commandBuilder.append(" --sourceFile ");
+        } else {
+            commandBuilder.append(" --destinationFile ");
+        }
+        commandBuilder.append(localPath);
+        return commandBuilder.toString();
+    }
+}

Reply via email to