sijie closed pull request #2160: Add numFunctionWorkers and externalServices to
cluster spec
URL: https://github.com/apache/incubator-pulsar/pull/2160
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
similarity index 51%
rename from
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
rename to
tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
index 5772ee313f..ebfa000764 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
@@ -16,28 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.tests.integration.functions.runtime;
+package org.apache.pulsar.tests.containers;
import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.BeforeClass;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
/**
- * Run the runtime test cases in thread mode.
+ * Cassandra Container.
*/
@Slf4j
-public class PulsarFunctionsThreadRuntimeTest extends
PulsarFunctionsRuntimeTest {
+public class CassandraContainer<SelfT extends ChaosContainer<SelfT>> extends
ChaosContainer<SelfT> {
- public PulsarFunctionsThreadRuntimeTest() {
- super(RuntimeFactory.THREAD);
- }
+ public static final String NAME = "cassandra";
+ public static final int PORT = 9042;
- @BeforeClass
- public void setupCluster() throws Exception {
- super.setupCluster(RuntimeFactory.THREAD.toString());
- pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);
- ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat",
"/pulsar/conf/functions_worker.yml");
- log.info("Functions Worker Config : \n{}", result.getStdout());
+ public CassandraContainer(String clusterName) {
+ super(clusterName, "cassandra:3");
}
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(PORT)
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(clusterName + "-" + NAME);
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
new file mode 100644
index 0000000000..83b5e42ba0
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Cassandra Container.
+ */
+@Slf4j
+public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends
ChaosContainer<SelfT> {
+
+ public static final String NAME = "kafka";
+ public static final int INTERNAL_PORT = 9092;
+ public static final int PORT = 9093;
+
+ public KafkaContainer(String clusterName) {
+ super(clusterName, "confluentinc/cp-kafka:4.1.1");
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(INTERNAL_PORT, PORT)
+ .withClasspathResourceMapping(
+ "kafka-zookeeper.properties", "/zookeeper.properties",
+ BindMode.READ_ONLY)
+ .withCommand("sh", "-c", "zookeeper-server-start
/zookeeper.properties & /etc/confluent/docker/run")
+ .withEnv("KAFKA_LISTENERS",
+ "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" +
"0.0.0.0" + ":" + PORT)
+ .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
+ .withEnv("KAFKA_BROKER_ID", "1")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
+ .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(clusterName + "-" + NAME);
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
new file mode 100644
index 0000000000..052db7e8b0
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.SocatContainer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Cassandra Container.
+ */
+@Slf4j
+public class KafkaProxyContainer extends SocatContainer {
+
+ public static final String NAME = "kafka-proxy";
+
+ private final String clusterName;
+
+ public KafkaProxyContainer(String clusterName) {
+ super();
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(clusterName + "-" + NAME);
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
similarity index 50%
rename from
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
rename to
tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
index ebc6e9974d..a3efd312f4 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/FunctionRuntimeType.java
@@ -16,26 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.tests.integration.functions.runtime;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.Container;
-import org.testng.annotations.BeforeClass;
+package org.apache.pulsar.tests.topologies;
/**
- * Run runtime tests in process mode.
+ * Runtime type to run functions.
*/
-@Slf4j
-public class PulsarFunctionsProcessRuntimeTest extends
PulsarFunctionsRuntimeTest {
- public PulsarFunctionsProcessRuntimeTest() {
- super(RuntimeFactory.PROCESS);
- }
-
- @BeforeClass
- public void setupCluster() throws Exception {
- super.setupCluster(RuntimeFactory.PROCESS.toString());
- pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
- Container.ExecResult result =
pulsarCluster.getAnyWorker().execCmd("cat",
"/pulsar/conf/functions_worker.yml");
- log.info("Functions Worker Config : \n{}", result.getStdout());
- }
+public enum FunctionRuntimeType {
+ PROCESS,
+ THREAD
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index d0c9eb42a3..6983a1c00c 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -21,20 +21,15 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
-import com.github.dockerjava.api.command.CreateContainerCmd;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
@@ -49,7 +44,6 @@
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
/**
* Pulsar Cluster in containers.
@@ -167,6 +161,27 @@ public void start() throws Exception {
log.info("Pulsar cluster {} is up running:", clusterName);
log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
log.info("\tHttp Service Url : {}", getHttpServiceUrl());
+
+ // start function workers
+ if (spec.numFunctionWorkers() > 0) {
+ switch (spec.functionRuntimeType()) {
+ case THREAD:
+
startFunctionWorkersWithThreadContainerFactory(spec.numFunctionWorkers());
+ break;
+ case PROCESS:
+
startFunctionWorkersWithProcessContainerFactory(spec.numFunctionWorkers());
+ break;
+ }
+ }
+
+ // start external services
+ Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
+ if (null != externalServices) {
+ externalServices.entrySet().forEach(service -> {
+ service.getValue().start();
+ log.info("Successfully start external service {}.",
service.getKey());
+ });
+ }
}
private static <T extends PulsarContainer> Map<String, T>
runNumContainers(String serviceName,
@@ -186,13 +201,15 @@ public void start() throws Exception {
}
public void stop() {
-
- Stream<GenericContainer> list1 = Stream.of(proxyContainer,
csContainer, zkContainer);
- Stream<GenericContainer> list2 =
- Stream.of(workerContainers.values(), brokerContainers.values(),
bookieContainers.values())
- .flatMap(Collection::stream);
- Stream<GenericContainer> list3 = Stream.concat(list1, list2);
- list3.parallel().forEach(GenericContainer::stop);
+ Stream.of(proxyContainer, csContainer,
zkContainer).parallel().forEach(GenericContainer::stop);
+
workerContainers.values().parallelStream().forEach(GenericContainer::stop);
+
brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
+
bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
+ if (null != spec.externalServices()) {
+ spec.externalServices().values()
+ .parallelStream()
+ .forEach(GenericContainer::stop);
+ }
try {
network.close();
@@ -201,7 +218,7 @@ public void stop() {
}
}
- public void startFunctionWorkersWithProcessContainerFactory(int
numFunctionWorkers) {
+ private void startFunctionWorkersWithProcessContainerFactory(int
numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
@@ -225,7 +242,7 @@ public void
startFunctionWorkersWithProcessContainerFactory(int numFunctionWorke
));
}
- public void startFunctionWorkersWithThreadContainerFactory(int
numFunctionWorkers) {
+ private void startFunctionWorkersWithThreadContainerFactory(int
numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index 85c9f5124d..fd830e56c1 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.tests.topologies;
+import java.util.Map;
import lombok.Builder;
import lombok.Builder.Default;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
+import org.apache.pulsar.tests.containers.ChaosContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
/**
* Spec to build a pulsar cluster.
@@ -64,6 +68,30 @@
@Default
int numProxies = 1;
+ /**
+ * Returns number of function workers.
+ *
+ * @return number of function workers.
+ */
+ @Default
+ int numFunctionWorkers = 0;
+
+ /**
+ * Returns the function runtime type.
+ *
+ * @return the function runtime type.
+ */
+ @Default
+ FunctionRuntimeType functionRuntimeType = FunctionRuntimeType.PROCESS;
+
+ /**
+ * Returns the list of external services to start with
+ * this cluster.
+ *
+ * @return the list of external services to start with the cluster.
+ */
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+
/**
* Returns the flag whether to enable/disable container log.
*
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index c2e4b88cb9..24e91842e5 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -62,7 +62,6 @@
@BeforeClass
public void setupCluster() throws Exception {
this.setupCluster("");
- pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
}
public void setupCluster(String namePrefix) throws Exception {
@@ -75,7 +74,7 @@ public void setupCluster(String namePrefix) throws Exception {
setupCluster(spec);
}
- private void setupCluster(PulsarClusterSpec spec) throws Exception {
+ protected void setupCluster(PulsarClusterSpec spec) throws Exception {
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
diff --git
a/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
b/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
new file mode 100644
index 0000000000..f7d1f92f22
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/resources/kafka-zookeeper.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+clientPort=2181
+dataDir=/var/lib/zookeeper/data
+dataLogDir=/var/lib/zookeeper/log
\ No newline at end of file
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 180e7f9375..8d224043e1 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,9 +18,14 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import static java.util.stream.Collectors.joining;
+
+import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.BeforeClass;
@@ -32,6 +37,34 @@
@Slf4j
public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+ protected final FunctionRuntimeType functionRuntimeType;
+
+ public PulsarFunctionsTestBase() {
+ this(FunctionRuntimeType.PROCESS);
+ }
+
+ protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType)
{
+ this.functionRuntimeType = functionRuntimeType;
+ }
+
+ @BeforeClass
+ @Override
+ public void setupCluster() throws Exception {
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .clusterName(Stream.of(this.getClass().getSimpleName(),
randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-")))
+ .functionRuntimeType(functionRuntimeType)
+ .numFunctionWorkers(2)
+ .build();
+
+ super.setupCluster(spec);
+ }
+
+ //
+ // Common Variables used by functions test
+ //
+
public static final String EXCLAMATION_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index f656886b46..6f498218ba 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -31,24 +31,29 @@
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
import org.testng.annotations.Test;
/**
* The tests that run over different container mode.
*/
-public abstract class PulsarFunctionsRuntimeTest extends
PulsarFunctionsTestBase {
+public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
- public enum RuntimeFactory {
- PROCESS,
- THREAD
+ @DataProvider(name = "FunctionRuntimeTypes")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS },
+ { FunctionRuntimeType.THREAD }
+ };
}
- private final RuntimeFactory runtimeFactory;
-
- public PulsarFunctionsRuntimeTest(RuntimeFactory runtimeFactory) {
- this.runtimeFactory = runtimeFactory;
+ @Factory(dataProvider = "FunctionRuntimeTypes")
+ PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
+ super(functionRuntimeType);
}
//
@@ -57,7 +62,7 @@ public PulsarFunctionsRuntimeTest(RuntimeFactory
runtimeFactory) {
@Test(dataProvider = "FunctionRuntimes")
public void testExclamationFunction(Runtime runtime) throws Exception {
- if (runtimeFactory == RuntimeFactory.THREAD && runtime ==
Runtime.PYTHON) {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime ==
Runtime.PYTHON) {
// python can only run on process mode
return;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services