This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 10f273a  Add numFunctionWorkers and externalServices to cluster spec 
(#2160)
10f273a is described below

commit 10f273aa00e03f83e77847d8d4b855cc40a6f23b
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Jul 13 17:03:43 2018 -0700

    Add numFunctionWorkers and externalServices to cluster spec (#2160)
    
    *Motivation*
    
    ClusterSpec is used for defining how a cluster looks like for integration
    testing. Add `numFunctionWorker` and `externalServices` in cluster spec,
    so we have a common place for setting up a cluster.
    
    *Changes*
    
    - `numFunctionWorkers`: define how many function workers to run in the 
cluster. When a cluster is created from the cluster spec, it will start
    the same number of worker containers.
    
    - `functionRuntimeType`: define how the worker will invoke functions,
    whether it is in process mode or thread mode.
    
    - `externalServices`: define whether there are more external services to
    run along with the cluster. for example, we need cassandra or kafka for
    testing connectors, and we need s3 mock for testing offloaders.
---
 .../tests/containers/CassandraContainer.java}      | 33 +++++++-----
 .../pulsar/tests/containers/KafkaContainer.java    | 63 ++++++++++++++++++++++
 .../tests/containers/KafkaProxyContainer.java      | 51 ++++++++++++++++++
 .../tests/topologies/FunctionRuntimeType.java}     | 24 ++-------
 .../pulsar/tests/topologies/PulsarCluster.java     | 47 ++++++++++------
 .../pulsar/tests/topologies/PulsarClusterSpec.java | 28 ++++++++++
 .../tests/topologies/PulsarClusterTestBase.java    |  3 +-
 .../src/main/resources/kafka-zookeeper.properties  | 36 +++++++++++++
 .../functions/PulsarFunctionsTestBase.java         | 33 ++++++++++++
 .../runtime/PulsarFunctionsRuntimeTest.java        | 23 ++++----
 10 files changed, 282 insertions(+), 59 deletions(-)

diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
similarity index 51%
rename from 
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
rename to 
tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
index 5772ee3..ebfa000 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
@@ -16,28 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.tests.integration.functions.runtime;
+package org.apache.pulsar.tests.containers;
 
 import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.BeforeClass;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 /**
- * Run the runtime test cases in thread mode.
+ * Cassandra Container.
  */
 @Slf4j
-public class PulsarFunctionsThreadRuntimeTest extends 
PulsarFunctionsRuntimeTest {
+public class CassandraContainer<SelfT extends ChaosContainer<SelfT>> extends 
ChaosContainer<SelfT> {
 
-    public PulsarFunctionsThreadRuntimeTest() {
-        super(RuntimeFactory.THREAD);
-    }
+    public static final String NAME = "cassandra";
+    public static final int PORT = 9042;
 
-    @BeforeClass
-    public void setupCluster() throws Exception {
-        super.setupCluster(RuntimeFactory.THREAD.toString());
-        pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat", 
"/pulsar/conf/functions_worker.yml");
-        log.info("Functions Worker Config : \n{}", result.getStdout());
+    public CassandraContainer(String clusterName) {
+        super(clusterName, "cassandra:3");
     }
 
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withExposedPorts(PORT)
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(clusterName + "-" + NAME);
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
new file mode 100644
index 0000000..83b5e42
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Cassandra Container.
+ */
+@Slf4j
+public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends 
ChaosContainer<SelfT> {
+
+    public static final String NAME = "kafka";
+    public static final int INTERNAL_PORT = 9092;
+    public static final int PORT = 9093;
+
+    public KafkaContainer(String clusterName) {
+        super(clusterName, "confluentinc/cp-kafka:4.1.1");
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withExposedPorts(INTERNAL_PORT, PORT)
+            .withClasspathResourceMapping(
+                "kafka-zookeeper.properties", "/zookeeper.properties",
+                BindMode.READ_ONLY)
+            .withCommand("sh", "-c", "zookeeper-server-start 
/zookeeper.properties & /etc/confluent/docker/run")
+            .withEnv("KAFKA_LISTENERS",
+                "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" + 
"0.0.0.0" + ":" + PORT)
+            .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
+            .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+            .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
+            .withEnv("KAFKA_BROKER_ID", "1")
+            .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+            .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
+            .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(clusterName + "-" + NAME);
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
+}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
new file mode 100644
index 0000000..052db7e
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.SocatContainer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Cassandra Container.
+ */
+@Slf4j
+public class KafkaProxyContainer extends SocatContainer {
+
+    public static final String NAME = "kafka-proxy";
+
+    private final String clusterName;
+
+    public KafkaProxyContainer(String clusterName) {
+        super();
+        this.clusterName = clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+            .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
+            .withCreateContainerCmdModifier(createContainerCmd -> {
+                createContainerCmd.withHostName(NAME);
+                createContainerCmd.withName(clusterName + "-" + NAME);
+            })
+            .waitingFor(new HostPortWaitStrategy());
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
similarity index 50%
rename from 
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
rename to 
tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
index ebc6e99..a3efd31 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
@@ -16,26 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.tests.integration.functions.runtime;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container;
-import org.testng.annotations.BeforeClass;
+package org.apache.pulsar.tests.topologies;
 
 /**
- * Run runtime tests in process mode.
+ * Runtime type to run functions.
  */
-@Slf4j
-public class PulsarFunctionsProcessRuntimeTest extends 
PulsarFunctionsRuntimeTest {
-    public PulsarFunctionsProcessRuntimeTest() {
-        super(RuntimeFactory.PROCESS);
-    }
-
-    @BeforeClass
-    public void setupCluster() throws Exception {
-        super.setupCluster(RuntimeFactory.PROCESS.toString());
-        pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
-        Container.ExecResult result = 
pulsarCluster.getAnyWorker().execCmd("cat", 
"/pulsar/conf/functions_worker.yml");
-        log.info("Functions Worker Config : \n{}", result.getStdout());
-    }
+public enum FunctionRuntimeType {
+    PROCESS,
+    THREAD
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index d0c9eb4..6983a1c 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -21,20 +21,15 @@ package org.apache.pulsar.tests.topologies;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
 
-import com.github.dockerjava.api.command.CreateContainerCmd;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import lombok.Getter;
@@ -49,7 +44,6 @@ import org.apache.pulsar.tests.containers.ZKContainer;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
 
 /**
  * Pulsar Cluster in containers.
@@ -167,6 +161,27 @@ public class PulsarCluster {
         log.info("Pulsar cluster {} is up running:", clusterName);
         log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
         log.info("\tHttp Service Url : {}", getHttpServiceUrl());
+
+        // start function workers
+        if (spec.numFunctionWorkers() > 0) {
+            switch (spec.functionRuntimeType()) {
+                case THREAD:
+                    
startFunctionWorkersWithThreadContainerFactory(spec.numFunctionWorkers());
+                    break;
+                case PROCESS:
+                    
startFunctionWorkersWithProcessContainerFactory(spec.numFunctionWorkers());
+                    break;
+            }
+        }
+
+        // start external services
+        Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
+        if (null != externalServices) {
+            externalServices.entrySet().forEach(service -> {
+                service.getValue().start();
+                log.info("Successfully start external service {}.", 
service.getKey());
+            });
+        }
     }
 
     private static <T extends PulsarContainer> Map<String, T> 
runNumContainers(String serviceName,
@@ -186,13 +201,15 @@ public class PulsarCluster {
     }
 
     public void stop() {
-
-        Stream<GenericContainer> list1 = Stream.of(proxyContainer, 
csContainer, zkContainer);
-        Stream<GenericContainer> list2 =
-            Stream.of(workerContainers.values(), brokerContainers.values(), 
bookieContainers.values())
-                .flatMap(Collection::stream);
-        Stream<GenericContainer> list3 = Stream.concat(list1, list2);
-        list3.parallel().forEach(GenericContainer::stop);
+        Stream.of(proxyContainer, csContainer, 
zkContainer).parallel().forEach(GenericContainer::stop);
+        
workerContainers.values().parallelStream().forEach(GenericContainer::stop);
+        
brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
+        
bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
+        if (null != spec.externalServices()) {
+            spec.externalServices().values()
+                .parallelStream()
+                .forEach(GenericContainer::stop);
+        }
 
         try {
             network.close();
@@ -201,7 +218,7 @@ public class PulsarCluster {
         }
     }
 
-    public void startFunctionWorkersWithProcessContainerFactory(int 
numFunctionWorkers) {
+    private void startFunctionWorkersWithProcessContainerFactory(int 
numFunctionWorkers) {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
@@ -225,7 +242,7 @@ public class PulsarCluster {
         ));
     }
 
-    public void startFunctionWorkersWithThreadContainerFactory(int 
numFunctionWorkers) {
+    private void startFunctionWorkersWithThreadContainerFactory(int 
numFunctionWorkers) {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index 85c9f51..fd830e5 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.tests.topologies;
 
+import java.util.Map;
 import lombok.Builder;
 import lombok.Builder.Default;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.tests.containers.ChaosContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
 
 /**
  * Spec to build a pulsar cluster.
@@ -65,6 +69,30 @@ public class PulsarClusterSpec {
     int numProxies = 1;
 
     /**
+     * Returns number of function workers.
+     *
+     * @return number of function workers.
+     */
+    @Default
+    int numFunctionWorkers = 0;
+
+    /**
+     * Returns the function runtime type.
+     *
+     * @return the function runtime type.
+     */
+    @Default
+    FunctionRuntimeType functionRuntimeType = FunctionRuntimeType.PROCESS;
+
+    /**
+     * Returns the list of external services to start with
+     * this cluster.
+     *
+     * @return the list of external services to start with the cluster.
+     */
+    Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+
+    /**
      * Returns the flag whether to enable/disable container log.
      *
      * @return the flag whether to enable/disable container log.
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index c2e4b88..24e9184 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -62,7 +62,6 @@ public abstract class PulsarClusterTestBase {
     @BeforeClass
     public void setupCluster() throws Exception {
         this.setupCluster("");
-        pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
     }
 
     public void setupCluster(String namePrefix) throws Exception {
@@ -75,7 +74,7 @@ public abstract class PulsarClusterTestBase {
         setupCluster(spec);
     }
 
-    private void setupCluster(PulsarClusterSpec spec) throws Exception {
+    protected void setupCluster(PulsarClusterSpec spec) throws Exception {
         log.info("Setting up cluster {} with {} bookies, {} brokers",
             spec.clusterName(), spec.numBookies(), spec.numBrokers());
 
diff --git 
a/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
 
b/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
new file mode 100644
index 0000000..f7d1f92
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+clientPort=2181
+dataDir=/var/lib/zookeeper/data
+dataLogDir=/var/lib/zookeeper/log
\ No newline at end of file
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 180e7f9..8d22404 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static java.util.stream.Collectors.joining;
+
+import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testng.annotations.BeforeClass;
@@ -32,6 +37,34 @@ import org.testng.annotations.DataProvider;
 @Slf4j
 public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
 
+    protected final FunctionRuntimeType functionRuntimeType;
+
+    public PulsarFunctionsTestBase() {
+        this(FunctionRuntimeType.PROCESS);
+    }
+
+    protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType) 
{
+        this.functionRuntimeType = functionRuntimeType;
+    }
+
+    @BeforeClass
+    @Override
+    public void setupCluster() throws Exception {
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .clusterName(Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-")))
+            .functionRuntimeType(functionRuntimeType)
+            .numFunctionWorkers(2)
+            .build();
+
+        super.setupCluster(spec);
+    }
+
+    //
+    // Common Variables used by functions test
+    //
+
     public static final String EXCLAMATION_JAVA_CLASS =
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
 
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index f656886..6f49821 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -31,24 +31,29 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 /**
  * The tests that run over different container mode.
  */
-public abstract class PulsarFunctionsRuntimeTest extends 
PulsarFunctionsTestBase {
+public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
 
-    public enum RuntimeFactory {
-        PROCESS,
-        THREAD
+    @DataProvider(name = "FunctionRuntimeTypes")
+    public static Object[][] getData() {
+        return new Object[][] {
+            { FunctionRuntimeType.PROCESS },
+            { FunctionRuntimeType.THREAD }
+        };
     }
 
-    private final RuntimeFactory runtimeFactory;
-
-    public PulsarFunctionsRuntimeTest(RuntimeFactory runtimeFactory) {
-        this.runtimeFactory = runtimeFactory;
+    @Factory(dataProvider = "FunctionRuntimeTypes")
+    PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
+        super(functionRuntimeType);
     }
 
     //
@@ -57,7 +62,7 @@ public abstract class PulsarFunctionsRuntimeTest extends 
PulsarFunctionsTestBase
 
     @Test(dataProvider = "FunctionRuntimes")
     public void testExclamationFunction(Runtime runtime) throws Exception {
-        if (runtimeFactory == RuntimeFactory.THREAD && runtime == 
Runtime.PYTHON) {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == 
Runtime.PYTHON) {
             // python can only run on process mode
             return;
         }

Reply via email to