This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 124dfaf Move integration tests for function upload/download to
apache (#2061)
124dfaf is described below
commit 124dfafeb3ab92f55ce5fb2de4220f3ea57a803c
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 2 00:17:40 2018 -0700
Move integration tests for function upload/download to apache (#2061)
This change add `function-worker` into the `PulsarClusterTestBase` to
create a test base for functions related tests `PulsarFunctionsTestBase`.
Move the existing upload/download tests to use it.
---
docker/pulsar/Dockerfile | 3 +-
docker/pulsar/scripts/gen-yml-from-env.py | 65 +++++++++++++++++
.../docker-images/latest-version-image/Dockerfile | 4 +-
.../conf/functions_worker.conf | 26 ++-----
.../scripts/run-functions-worker.sh | 26 +++----
.../pulsar/tests/containers/PulsarContainer.java | 9 +--
.../pulsar/tests/containers/WorkerContainer.java | 30 ++++++++
.../pulsar/tests/topologies/PulsarCluster.java | 52 +++++++++++--
.../integration/functions/PulsarFunctionsTest.java | 85 ++++++++++++++++++++++
.../functions/PulsarFunctionsTestBase.java | 36 +++++++++
.../utils/UploadDownloadCommandGenerator.java | 74 +++++++++++++++++++
11 files changed, 361 insertions(+), 49 deletions(-)
diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile
index 87ef64f..508b4f3 100644
--- a/docker/pulsar/Dockerfile
+++ b/docker/pulsar/Dockerfile
@@ -20,7 +20,7 @@
FROM openjdk:8-jdk
# Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
+RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
python-yaml
ARG PULSAR_TARBALL
@@ -28,6 +28,7 @@ ADD ${PULSAR_TARBALL} /
RUN mv /apache-pulsar-* /pulsar
COPY scripts/apply-config-from-env.py /pulsar/bin
+COPY scripts/gen-yml-from-env.py /pulsar/bin
COPY scripts/generate-zookeeper-config.sh /pulsar/bin
COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
COPY scripts/watch-znode.py /pulsar/bin
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py
b/docker/pulsar/scripts/gen-yml-from-env.py
new file mode 100755
index 0000000..28599a5
--- /dev/null
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+## Generate a yml from env.py
+##
+## ./gen-yml-from-env.py <template yml file> [<template yml file>]
+##
+
+import os, sys
+import yaml
+
+if len(sys.argv) < 2:
+ print 'Usage: %s' % (sys.argv[0])
+ sys.exit(1)
+
+conf_files = sys.argv[1:]
+
+for conf_filename in conf_files:
+ conf = yaml.load(open(conf_filename))
+
+ # update the config
+ modified = False
+ for k in sorted(os.environ.keys()):
+ key_parts = k.split('_')
+ v = os.environ[k]
+
+ i = 0
+ conf_to_modify = conf
+ while i < len(key_parts):
+ key_part = key_parts[i]
+ if not key_part in conf_to_modify:
+ break
+
+ if i == (len(key_parts) - 1):
+ if key_part == 'workerPort':
+ conf_to_modify[key_part] = int(v)
+ else:
+ conf_to_modify[key_part] = v
+
+ modified = True
+ else:
+ conf_to_modify = conf_to_modify[key_part]
+ i += 1
+ # Store back the updated config in the same file
+ f = open(conf_filename , 'w')
+ yaml.dump(conf, f, default_flow_style=False)
+ f.close()
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index deb0676..e638ba0 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -24,12 +24,12 @@ RUN apt-get update && apt-get install -y supervisor
RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p
/pulsar/ssl
COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf \
+COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf
conf/functions_worker.conf \
conf/proxy.conf /etc/supervisord/conf.d/
COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
- scripts/run-bookie.sh scripts/run-broker.sh scripts/run-proxy.sh
/pulsar/bin/
+ scripts/run-bookie.sh scripts/run-broker.sh
scripts/run-functions-worker.sh scripts/run-proxy.sh /pulsar/bin/
diff --git a/docker/pulsar/Dockerfile
b/tests/docker-images/latest-version-image/conf/functions_worker.conf
similarity index 62%
copy from docker/pulsar/Dockerfile
copy to tests/docker-images/latest-version-image/conf/functions_worker.conf
index 87ef64f..637be95 100644
--- a/docker/pulsar/Dockerfile
+++ b/tests/docker-images/latest-version-image/conf/functions_worker.conf
@@ -17,23 +17,11 @@
# under the License.
#
-FROM openjdk:8-jdk
+[program:functions-worker]
+autostart=false
+redirect_stderr=true
+stdout_logfile=/var/log/pulsar/functions_worker.log
+directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar functions-worker
-# Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
-
-ARG PULSAR_TARBALL
-
-ADD ${PULSAR_TARBALL} /
-RUN mv /apache-pulsar-* /pulsar
-
-COPY scripts/apply-config-from-env.py /pulsar/bin
-COPY scripts/generate-zookeeper-config.sh /pulsar/bin
-COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
-COPY scripts/watch-znode.py /pulsar/bin
-
-WORKDIR /pulsar
-
-VOLUME ["/pulsar/conf", "/pulsar/data"]
-
-ENV PULSAR_ROOT_LOGGER=INFO,CONSOLE
diff --git a/docker/pulsar/Dockerfile
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
old mode 100644
new mode 100755
similarity index 62%
copy from docker/pulsar/Dockerfile
copy to tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
index 87ef64f..3fadf96
--- a/docker/pulsar/Dockerfile
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,23 +18,14 @@
# under the License.
#
-FROM openjdk:8-jdk
+bin/apply-config-from-env.py conf/client.conf && \
+ bin/gen-yml-from-env.py conf/functions_worker.yml && \
+ bin/apply-config-from-env.py conf/pulsar_env.sh
-# Install some utilities
-RUN apt-get update && apt-get install -y netcat dnsutils python-kazoo
+if [ -z "$NO_AUTOSTART" ]; then
+ sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/functions_worker.conf
+fi
-ARG PULSAR_TARBALL
+bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
+exec /usr/bin/supervisord -c /etc/supervisord.conf
-ADD ${PULSAR_TARBALL} /
-RUN mv /apache-pulsar-* /pulsar
-
-COPY scripts/apply-config-from-env.py /pulsar/bin
-COPY scripts/generate-zookeeper-config.sh /pulsar/bin
-COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
-COPY scripts/watch-znode.py /pulsar/bin
-
-WORKDIR /pulsar
-
-VOLUME ["/pulsar/conf", "/pulsar/data"]
-
-ENV PULSAR_ROOT_LOGGER=INFO,CONSOLE
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
index eb0ae84..0a0b159 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
@@ -68,10 +68,9 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
@Override
protected void configure() {
if (httpPort > 0) {
- addExposedPorts(
- servicePort, httpPort
- );
- } else if (servicePort > 0) {
+ addExposedPorts(httpPort);
+ }
+ if (servicePort > 0) {
addExposedPort(servicePort);
}
}
@@ -80,7 +79,7 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
public void start() {
if (httpPort > 0 || servicePort > 0) {
this.waitStrategy = new HostPortWaitStrategy()
- .withStartupTimeout(Duration.of(60, SECONDS));
+ .withStartupTimeout(Duration.of(300, SECONDS));
}
this.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(hostname);
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
new file mode 100644
index 0000000..df8a28e
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/WorkerContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs functions worker.
+ */
+public class WorkerContainer extends PulsarContainer<WorkerContainer> {
+
+ public WorkerContainer(String clusterName, String hostname) {
+ super(
+ clusterName, hostname, hostname, "bin/run-functions-worker.sh",
-1, BROKER_HTTP_PORT);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 167908e..501606e 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.tests.containers.BrokerContainer;
import org.apache.pulsar.tests.containers.CSContainer;
import org.apache.pulsar.tests.containers.ProxyContainer;
import org.apache.pulsar.tests.containers.PulsarContainer;
+import org.apache.pulsar.tests.containers.WorkerContainer;
import org.apache.pulsar.tests.containers.ZKContainer;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.Network;
@@ -67,6 +68,7 @@ public class PulsarCluster {
private final CSContainer csContainer;
private final Map<String, BKContainer> bookieContainers;
private final Map<String, BrokerContainer> brokerContainers;
+ private final Map<String, WorkerContainer> workerContainers;
private final ProxyContainer proxyContainer;
private PulsarCluster(PulsarClusterSpec spec) {
@@ -86,9 +88,11 @@ public class PulsarCluster {
.withNetworkAliases(CSContainer.NAME);
this.bookieContainers = Maps.newTreeMap();
this.brokerContainers = Maps.newTreeMap();
+ this.workerContainers = Maps.newTreeMap();
this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
.withNetwork(network)
.withNetworkAliases("pulsar-proxy")
+ .withEnv("zkServers", ZKContainer.NAME)
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
.withEnv("clusterName", clusterName);
@@ -132,6 +136,7 @@ public class PulsarCluster {
runNumContainers("broker", spec.numBrokers(), (name) -> new
BrokerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
+ .withEnv("zkServers", ZKContainer.NAME)
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
.withEnv("clusterName", clusterName)
@@ -166,6 +171,7 @@ public class PulsarCluster {
public void stop() {
proxyContainer.stop();
+ workerContainers.values().forEach(WorkerContainer::stop);
brokerContainers.values().forEach(BrokerContainer::stop);
bookieContainers.values().forEach(BKContainer::stop);
csContainer.stop();
@@ -177,12 +183,48 @@ public class PulsarCluster {
}
}
+ public void startFunctionWorkers(int numFunctionWorkers) {
+ String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
+ String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
+ workerContainers.putAll(runNumContainers(
+ "functions-worker",
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("workerId", name)
+ .withEnv("workerHostname", name)
+ .withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("pulsarFunctionsCluster", clusterName)
+ .withEnv("pulsarServiceUrl", serviceUrl)
+ .withEnv("pulsarWebServiceUrl", httpServiceUrl)
+ .withEnv("clusterName", clusterName)
+ // script
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
+ ));
+ }
+
public BrokerContainer getAnyBroker() {
- List<BrokerContainer> brokerList = Lists.newArrayList();
- brokerList.addAll(brokerContainers.values());
- Collections.shuffle(brokerList);
- checkArgument(!brokerList.isEmpty(), "No broker is alive");
- return brokerList.get(0);
+ return getAnyContainer(brokerContainers, "broker");
+ }
+
+ public WorkerContainer getAnyWorker() {
+ return getAnyContainer(workerContainers, "functions-worker");
+ }
+
+ private <T> T getAnyContainer(Map<String, T> containers, String
serviceName) {
+ List<T> containerList = Lists.newArrayList();
+ containerList.addAll(containers.values());
+ Collections.shuffle(containerList);
+ checkArgument(!containerList.isEmpty(), "No " + serviceName + " is
alive");
+ return containerList.get(0);
+ }
+
+ public Collection<BrokerContainer> getBrokers() {
+ return brokerContainers.values();
}
public Collection<BrokerContainer> getBrokers() {
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
new file mode 100644
index 0000000..07d86f7
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.io.Files;
+import java.io.File;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.WorkerContainer;
+import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+
+ @Test
+ public String checkUpload() throws Exception {
+ String bkPkgPath = String.format("%s/%s/%s",
+ "tenant-" + randomName(8),
+ "ns-" + randomName(8),
+ "fn-" + randomName(8));
+
+ UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createUploader(
+ PulsarCluster.ADMIN_SCRIPT,
+ bkPkgPath);
+ String actualCommand = generator.generateCommand();
+
+ log.info(actualCommand);
+
+ String[] commands = {
+ "sh", "-c", actualCommand
+ };
+ ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
+ return bkPkgPath;
+ }
+
+ @Test
+ public void checkDownload() throws Exception {
+ String bkPkgPath = checkUpload();
+ String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+
+ UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createDownloader(
+ localPkgFile,
+ bkPkgPath);
+ String actualCommand = generator.generateCommand();
+
+ log.info(actualCommand);
+
+ String[] commands = {
+ "sh", "-c", actualCommand
+ };
+ WorkerContainer container = pulsarCluster.getAnyWorker();
+ ExecResult output = container.execCmd(commands);
+ assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
+ String[] diffCommand = {
+ "diff",
+ PulsarCluster.ADMIN_SCRIPT,
+ localPkgFile
+ };
+ output = container.execCmd(diffCommand);
+ assertTrue(output.getStdout().isEmpty());
+ assertTrue(output.getStderr().isEmpty());
+ }
+
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
new file mode 100644
index 0000000..8e692ad
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * A cluster to run pulsar functions for testing functions related features.
+ */
+public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ PulsarClusterTestBase.setupCluster();
+
+ pulsarCluster.startFunctionWorkers(1);
+ }
+
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
new file mode 100644
index 0000000..adc0228
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/UploadDownloadCommandGenerator.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class UploadDownloadCommandGenerator {
+ public enum MODE {
+ UPLOAD,
+ DOWNLOAD,
+ };
+ private MODE mode;
+ private String bkPath;
+ private String localPath;
+ private String brokerUrl;
+
+ public static UploadDownloadCommandGenerator createUploader(String
localPath, String bkPath) {
+ return new UploadDownloadCommandGenerator(MODE.UPLOAD, localPath,
bkPath);
+ }
+
+ public static UploadDownloadCommandGenerator createDownloader(String
localPath, String bkPath) {
+ return new UploadDownloadCommandGenerator(MODE.DOWNLOAD, localPath,
bkPath);
+ }
+
+ public UploadDownloadCommandGenerator(MODE mode, String localPath, String
bkPath) {
+ this.mode = mode;
+ this.localPath = localPath;
+ this.bkPath = bkPath;
+ }
+
+ public void createBrokerUrl(String host, int port) {
+ brokerUrl = "pulsar://" + host + ":" + port;
+ }
+
+ public String generateCommand() {
+ StringBuilder commandBuilder = new
StringBuilder().append(PulsarCluster.ADMIN_SCRIPT).append(" functions ");
+ if (mode == MODE.UPLOAD) {
+ commandBuilder.append(" upload ");
+ } else {
+ commandBuilder.append(" download ");
+ }
+ commandBuilder.append(" --path ");
+ commandBuilder.append(bkPath);
+ if (mode == MODE.UPLOAD) {
+ commandBuilder.append(" --sourceFile ");
+ } else {
+ commandBuilder.append(" --destinationFile ");
+ }
+ commandBuilder.append(localPath);
+ return commandBuilder.toString();
+ }
+}