sijie closed pull request #2171: Add integration test for kafka source
URL: https://github.com/apache/incubator-pulsar/pull/2171
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pom.xml b/pom.xml
index a6d645379b..263d3624d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -799,6 +799,11 @@ flexible messaging model and an intuitive client
API.</description>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
<dependency>
<groupId>org.arquillian.cube</groupId>
<artifactId>arquillian-cube-docker</artifactId>
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
deleted file mode 100644
index 83b5e42ba0..0000000000
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends
ChaosContainer<SelfT> {
-
- public static final String NAME = "kafka";
- public static final int INTERNAL_PORT = 9092;
- public static final int PORT = 9093;
-
- public KafkaContainer(String clusterName) {
- super(clusterName, "confluentinc/cp-kafka:4.1.1");
- }
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withExposedPorts(INTERNAL_PORT, PORT)
- .withClasspathResourceMapping(
- "kafka-zookeeper.properties", "/zookeeper.properties",
- BindMode.READ_ONLY)
- .withCommand("sh", "-c", "zookeeper-server-start
/zookeeper.properties & /etc/confluent/docker/run")
- .withEnv("KAFKA_LISTENERS",
- "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" +
"0.0.0.0" + ":" + PORT)
- .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
- .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
- .withEnv("KAFKA_BROKER_ID", "1")
- .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
- .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
- .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
-}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
deleted file mode 100644
index 052db7e8b0..0000000000
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.SocatContainer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaProxyContainer extends SocatContainer {
-
- public static final String NAME = "kafka-proxy";
-
- private final String clusterName;
-
- public KafkaProxyContainer(String clusterName) {
- super();
- this.clusterName = clusterName;
- }
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
-}
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index b98237ada4..09258dba21 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -58,5 +58,16 @@
<artifactId>cassandra-driver-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 483b564d40..6bbfbe3a25 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,17 +25,20 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.containers.CassandraContainer;
import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
/**
* A tester for testing cassandra sink.
*/
@Slf4j
-public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+public class CassandraSinkTester extends SinkTester {
private static final String ROOTS = "cassandra";
private static final String KEY = "key";
@@ -64,9 +67,11 @@ public CassandraSinkTester() {
}
@Override
- protected CassandraContainer newSinkService(String clusterName) {
+ protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
this.cassandraCluster = new CassandraContainer(clusterName);
- return this.cassandraCluster;
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put(CassandraContainer.NAME, cassandraCluster);
+ return containers;
}
@Override
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
new file mode 100644
index 0000000000..66595316c7
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka sink.
+ */
+@Slf4j
+public class KafkaSinkTester extends SinkTester {
+
+ private static final String NAME = "kafka";
+
+ private final String kafkaTopicName;
+
+ private KafkaContainer kafkaContainer;
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ public KafkaSinkTester() {
+ super(NAME);
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.kafkaTopicName = "kafka_sink_topic_" + suffix;
+
+ sinkConfig.put("bootstrapServers", NAME + ":9092");
+ sinkConfig.put("acks", "all");
+ sinkConfig.put("batchSize", 1L);
+ sinkConfig.put("maxRequestSize", 1048576L);
+ sinkConfig.put("topic", kafkaTopicName);
+ }
+
+ @Override
+ protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
+ this.kafkaContainer = new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(NAME)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(NAME)
+ .withHostName(clusterName + "-" + NAME));
+
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put("kafka", kafkaContainer);
+ return containers;
+ }
+
+ @Override
+ protected void prepareSink() throws Exception {
+ ExecResult execResult = kafkaContainer.execInContainer(
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ "localhost:2181",
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
+ assertTrue(
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
+
+ kafkaConsumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" +
TestUtils.randomName(8),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+ log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+ }
+
+ @Override
+ protected void validateSinkResult(Map<String, String> kvs) {
+ Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
+ while (kvIter.hasNext()) {
+ ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
+ log.info("Received {} records from kafka topic {}",
+ records.count(), kafkaTopicName);
+ if (records.isEmpty()) {
+ continue;
+ }
+
+ Iterator<ConsumerRecord<String, String>> recordsIter =
records.iterator();
+ while (recordsIter.hasNext() && kvIter.hasNext()) {
+ ConsumerRecord<String, String> consumerRecord =
recordsIter.next();
+ Map.Entry<String, String> expectedRecord = kvIter.next();
+ assertEquals(expectedRecord.getKey(), consumerRecord.key());
+ assertEquals(expectedRecord.getValue(),
consumerRecord.value());
+ }
+ }
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
new file mode 100644
index 0000000000..e690d6beb5
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka source.
+ */
+@Slf4j
+public class KafkaSourceTester extends SourceTester {
+
+ private static final String NAME = "kafka";
+
+ private final String kafkaTopicName;
+
+ private KafkaContainer kafkaContainer;
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ public KafkaSourceTester() {
+ super(NAME);
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.kafkaTopicName = "kafka_source_topic_" + suffix;
+
+ sourceConfig.put("bootstrapServers", NAME + ":9092");
+ sourceConfig.put("groupId", "test-source-group");
+ sourceConfig.put("fetchMinBytes", 1L);
+ sourceConfig.put("autoCommitIntervalMs", 10L);
+ sourceConfig.put("sessionTimeoutMs", 10000L);
+ sourceConfig.put("topic", kafkaTopicName);
+ sourceConfig.put("valueDeserializationClass",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ }
+
+ @Override
+ protected Map<String, GenericContainer<?>> newSourceService(String
clusterName) {
+ this.kafkaContainer = new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(NAME)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(NAME)
+ .withHostName(clusterName + "-" + NAME));
+
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put("kafka", kafkaContainer);
+ return containers;
+ }
+
+ @Override
+ protected void prepareSource() throws Exception {
+ ExecResult execResult = kafkaContainer.execInContainer(
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ "localhost:2181",
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
+ assertTrue(
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
+
+ kafkaConsumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "source-test-" +
TestUtils.randomName(8),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+ log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+ }
+
+ @Override
+ protected Map<String, String> produceSourceMessages(int numMessages)
throws Exception{
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
+ ),
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ ProducerRecord<String, String> record = new ProducerRecord<>(
+ kafkaTopicName,
+ key,
+ value
+ );
+ kvs.put(key, value);
+ producer.send(record).get();
+ }
+
+ log.info("Successfully produce {} messages to kafka topic {}",
kafkaTopicName);
+ return kvs;
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index 52719bff70..464a716fcd 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -22,6 +22,7 @@
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -52,7 +53,9 @@
public static Object[][] getData() {
return new Object[][] {
{ FunctionRuntimeType.PROCESS, new CassandraSinkTester() },
- { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+ { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
+ { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
+ { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
};
}
@@ -68,7 +71,7 @@
protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
PulsarClusterSpecBuilder specBuilder) {
Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
- externalServices.put(tester.sinkType,
tester.newSinkService(clusterName));
+ externalServices.putAll(tester.newSinkService(clusterName));
return super.beforeSetupCluster(clusterName, specBuilder)
.externalServices(externalServices);
}
@@ -109,7 +112,7 @@ public void testSink() throws Exception {
getSinkInfoNotFound(tenant, namespace, sinkName);
}
- protected void prepareSink() {
+ protected void prepareSink() throws Exception {
tester.prepareSink();
}
@@ -146,7 +149,7 @@ protected void getSinkInfoSuccess(String tenant, String
namespace, String sinkNa
ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink info : {}", result.getStdout());
assertTrue(
- result.getStdout().contains("\"builtin\": \"cassandra\""),
+ result.getStdout().contains("\"builtin\": \"" + tester.sinkType +
"\""),
result.getStdout()
);
}
@@ -181,7 +184,7 @@ protected void getSinkStatus(String tenant, String
namespace, String sinkName) t
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
- Map<String, String> kvs = Maps.newHashMap();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
String value = "value-" + i;
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
new file mode 100644
index 0000000000..98cb1f2276
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing source.
+ */
+@Slf4j
+public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
+
+ @DataProvider(name = "Sources")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS, new KafkaSourceTester() },
+ { FunctionRuntimeType.THREAD, new KafkaSourceTester() }
+ };
+ }
+
+ protected final SourceTester tester;
+
+ @Factory(dataProvider = "Sources")
+ PulsarIOSourceTest(FunctionRuntimeType functionRuntimeType, SourceTester
tester) {
+ super(functionRuntimeType);
+ this.tester = tester;
+ }
+
+ @Override
+ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+
PulsarClusterSpecBuilder specBuilder) {
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ externalServices.putAll(tester.newSourceService(clusterName));
+ return super.beforeSetupCluster(clusterName, specBuilder)
+ .externalServices(externalServices);
+ }
+
+ @Test
+ public void testSource() throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "test-source-connector-output-topic-" +
randomName(8);
+ final String sourceName = "test-source-connector-name-" +
randomName(8);
+ final int numMessages = 20;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopicName)
+ .subscriptionName("source-tester")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ // prepare the testing environment for source
+ prepareSource();
+
+ // submit the source connector
+ submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
+
+ // get source info
+ getSourceInfoSuccess(tenant, namespace, sourceName);
+
+ // get source status
+ getSourceStatus(tenant, namespace, sourceName);
+
+ // produce messages
+ Map<String, String> kvs = tester.produceSourceMessages(numMessages);
+
+ // wait for source to process messages
+ waitForProcessingMessages(tenant, namespace, sourceName, numMessages);
+
+ // validate the source result
+ validateSourceResult(consumer, kvs);
+
+ // delete the source
+ deleteSource(tenant, namespace, sourceName);
+
+ // get source info (source should be deleted)
+ getSourceInfoNotFound(tenant, namespace, sourceName);
+ }
+
+ protected void prepareSource() throws Exception {
+ tester.prepareSource();
+ }
+
+ protected void submitSourceConnector(String tenant,
+ String namespace,
+ String sourceName,
+ String outputTopicName) throws
Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName,
+ "--source-type", tester.sourceType(),
+ "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+ "--destinationTopicName", outputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSourceInfoSuccess(String tenant, String namespace,
String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"" + tester.sourceType
+ "\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSourceStatus(String tenant, String namespace, String
sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void validateSourceResult(Consumer<String> consumer,
+ Map<String, String> kvs) throws
Exception {
+ for (Map.Entry<String, String> kv : kvs.entrySet()) {
+ Message<String> msg = consumer.receive();
+ assertEquals(kv.getKey(), msg.getKey());
+ assertEquals(kv.getValue(), msg.getValue());
+ }
+ }
+
+ protected void waitForProcessingMessages(String tenant,
+ String namespace,
+ String sourceName,
+ int numMessages) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ log.info("{} ms has elapsed but the source hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void deleteSource(String tenant, String namespace, String
sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Delete source successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(result.getStderr().contains("Reason: Function " +
sourceName + " doesn't exist"));
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index dbc5884881..75434be529 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -25,7 +25,7 @@
/**
* A tester used for testing a specific sink.
*/
-public abstract class SinkTester<SINK_SERVICE_CONTAINER extends
GenericContainer> {
+public abstract class SinkTester {
protected final String sinkType;
protected final Map<String, Object> sinkConfig;
@@ -35,7 +35,7 @@ protected SinkTester(String sinkType) {
this.sinkConfig = Maps.newHashMap();
}
- protected abstract SINK_SERVICE_CONTAINER newSinkService(String
clusterName);
+ protected abstract Map<String, GenericContainer<?>> newSinkService(String
clusterName);
protected String sinkType() {
return sinkType;
@@ -45,7 +45,7 @@ protected String sinkType() {
return sinkConfig;
}
- protected abstract void prepareSink();
+ protected abstract void prepareSink() throws Exception;
protected abstract void validateSinkResult(Map<String, String> kvs);
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
new file mode 100644
index 0000000000..a28d0861d2
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific source.
+ */
+public abstract class SourceTester {
+
+ protected final String sourceType;
+ protected final Map<String, Object> sourceConfig;
+
+ protected SourceTester(String sourceType) {
+ this.sourceType = sourceType;
+ this.sourceConfig = Maps.newHashMap();
+ }
+
+ protected abstract Map<String, GenericContainer<?>>
newSourceService(String clusterName);
+
+ protected String sourceType() {
+ return sourceType;
+ }
+
+ protected Map<String, Object> sourceConfig() {
+ return sourceConfig;
+ }
+
+ protected abstract void prepareSource() throws Exception;
+
+ protected abstract Map<String, String> produceSourceMessages(int
numMessages) throws Exception;
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services