This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8de127d Add integration test for kafka source (#2171)
8de127d is described below
commit 8de127d65f0a3eb8eb9bad0194d305f24d3ed791
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 17 15:08:13 2018 -0700
Add integration test for kafka source (#2171)
*Motivation*
We added integration tests for kafka & cassandra sinks. We need test
coverage on kafka sources.
*Changes*
- Add `PulsarIOSourceTest` and `SourceTester` for testing sources
- Implement `KafkaSourceTester` for testing kafka source
---
.../tests/integration/io/KafkaSourceTester.java | 140 +++++++++++
.../tests/integration/io/PulsarIOSourceTest.java | 255 +++++++++++++++++++++
.../pulsar/tests/integration/io/SourceTester.java | 52 +++++
3 files changed, 447 insertions(+)
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
new file mode 100644
index 0000000..e690d6b
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka source.
+ */
+@Slf4j
+public class KafkaSourceTester extends SourceTester {
+
+ private static final String NAME = "kafka";
+
+ private final String kafkaTopicName;
+
+ private KafkaContainer kafkaContainer;
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ public KafkaSourceTester() {
+ super(NAME);
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.kafkaTopicName = "kafka_source_topic_" + suffix;
+
+ sourceConfig.put("bootstrapServers", NAME + ":9092");
+ sourceConfig.put("groupId", "test-source-group");
+ sourceConfig.put("fetchMinBytes", 1L);
+ sourceConfig.put("autoCommitIntervalMs", 10L);
+ sourceConfig.put("sessionTimeoutMs", 10000L);
+ sourceConfig.put("topic", kafkaTopicName);
+ sourceConfig.put("valueDeserializationClass",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ }
+
+ @Override
+ protected Map<String, GenericContainer<?>> newSourceService(String
clusterName) {
+ this.kafkaContainer = new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(NAME)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(NAME)
+ .withHostName(clusterName + "-" + NAME));
+
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put("kafka", kafkaContainer);
+ return containers;
+ }
+
+ @Override
+ protected void prepareSource() throws Exception {
+ ExecResult execResult = kafkaContainer.execInContainer(
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ "localhost:2181",
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
+ assertTrue(
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
+
+ kafkaConsumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "source-test-" +
TestUtils.randomName(8),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+ log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+ }
+
+ @Override
+ protected Map<String, String> produceSourceMessages(int numMessages)
throws Exception{
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
+ ),
+ new StringSerializer(),
+ new StringSerializer()
+ );
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ ProducerRecord<String, String> record = new ProducerRecord<>(
+ kafkaTopicName,
+ key,
+ value
+ );
+ kvs.put(key, value);
+ producer.send(record).get();
+ }
+
+ log.info("Successfully produce {} messages to kafka topic {}",
kafkaTopicName);
+ return kvs;
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
new file mode 100644
index 0000000..98cb1f2
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing source.
+ */
+@Slf4j
+public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
+
+ @DataProvider(name = "Sources")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS, new KafkaSourceTester() },
+ { FunctionRuntimeType.THREAD, new KafkaSourceTester() }
+ };
+ }
+
+ protected final SourceTester tester;
+
+ @Factory(dataProvider = "Sources")
+ PulsarIOSourceTest(FunctionRuntimeType functionRuntimeType, SourceTester
tester) {
+ super(functionRuntimeType);
+ this.tester = tester;
+ }
+
+ @Override
+ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+
PulsarClusterSpecBuilder specBuilder) {
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ externalServices.putAll(tester.newSourceService(clusterName));
+ return super.beforeSetupCluster(clusterName, specBuilder)
+ .externalServices(externalServices);
+ }
+
+ @Test
+ public void testSource() throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "test-source-connector-output-topic-" +
randomName(8);
+ final String sourceName = "test-source-connector-name-" +
randomName(8);
+ final int numMessages = 20;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopicName)
+ .subscriptionName("source-tester")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ // prepare the testing environment for source
+ prepareSource();
+
+ // submit the source connector
+ submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
+
+ // get source info
+ getSourceInfoSuccess(tenant, namespace, sourceName);
+
+ // get source status
+ getSourceStatus(tenant, namespace, sourceName);
+
+ // produce messages
+ Map<String, String> kvs = tester.produceSourceMessages(numMessages);
+
+ // wait for source to process messages
+ waitForProcessingMessages(tenant, namespace, sourceName, numMessages);
+
+ // validate the source result
+ validateSourceResult(consumer, kvs);
+
+ // delete the source
+ deleteSource(tenant, namespace, sourceName);
+
+ // get source info (source should be deleted)
+ getSourceInfoNotFound(tenant, namespace, sourceName);
+ }
+
+ protected void prepareSource() throws Exception {
+ tester.prepareSource();
+ }
+
+ protected void submitSourceConnector(String tenant,
+ String namespace,
+ String sourceName,
+ String outputTopicName) throws
Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName,
+ "--source-type", tester.sourceType(),
+ "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+ "--destinationTopicName", outputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSourceInfoSuccess(String tenant, String namespace,
String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"" + tester.sourceType
+ "\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSourceStatus(String tenant, String namespace, String
sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void validateSourceResult(Consumer<String> consumer,
+ Map<String, String> kvs) throws
Exception {
+ for (Map.Entry<String, String> kv : kvs.entrySet()) {
+ Message<String> msg = consumer.receive();
+ assertEquals(kv.getKey(), msg.getKey());
+ assertEquals(kv.getValue(), msg.getValue());
+ }
+ }
+
+ protected void waitForProcessingMessages(String tenant,
+ String namespace,
+ String sourceName,
+ int numMessages) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ log.info("{} ms has elapsed but the source hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void deleteSource(String tenant, String namespace, String
sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Delete source successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(result.getStderr().contains("Reason: Function " +
sourceName + " doesn't exist"));
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
new file mode 100644
index 0000000..a28d086
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific source.
+ */
+public abstract class SourceTester {
+
+ protected final String sourceType;
+ protected final Map<String, Object> sourceConfig;
+
+ protected SourceTester(String sourceType) {
+ this.sourceType = sourceType;
+ this.sourceConfig = Maps.newHashMap();
+ }
+
+ protected abstract Map<String, GenericContainer<?>>
newSourceService(String clusterName);
+
+ protected String sourceType() {
+ return sourceType;
+ }
+
+ protected Map<String, Object> sourceConfig() {
+ return sourceConfig;
+ }
+
+ protected abstract void prepareSource() throws Exception;
+
+ protected abstract Map<String, String> produceSourceMessages(int
numMessages) throws Exception;
+
+}