This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de127d  Add integration test for kafka source (#2171)
8de127d is described below

commit 8de127d65f0a3eb8eb9bad0194d305f24d3ed791
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 17 15:08:13 2018 -0700

    Add integration test for kafka source (#2171)
    
    
    
    *Motivation*
    
    We added integration tests for kafka & cassandra sinks. We need test 
coverage on kafka sources.
    
    *Changes*
    
    - Add `PulsarIOSourceTest` and `SourceTester` for testing sources
    - Implement `KafkaSourceTester` for testing kafka source
---
 .../tests/integration/io/KafkaSourceTester.java    | 140 +++++++++++
 .../tests/integration/io/PulsarIOSourceTest.java   | 255 +++++++++++++++++++++
 .../pulsar/tests/integration/io/SourceTester.java  |  52 +++++
 3 files changed, 447 insertions(+)

diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
new file mode 100644
index 0000000..e690d6b
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka source.
+ */
+@Slf4j
+public class KafkaSourceTester extends SourceTester {
+
+    private static final String NAME = "kafka";
+
+    private final String kafkaTopicName;
+
+    private KafkaContainer kafkaContainer;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    public KafkaSourceTester() {
+        super(NAME);
+        String suffix = TestUtils.randomName(8) + "_" + 
System.currentTimeMillis();
+        this.kafkaTopicName = "kafka_source_topic_" + suffix;
+
+        sourceConfig.put("bootstrapServers", NAME + ":9092");
+        sourceConfig.put("groupId", "test-source-group");
+        sourceConfig.put("fetchMinBytes", 1L);
+        sourceConfig.put("autoCommitIntervalMs", 10L);
+        sourceConfig.put("sessionTimeoutMs", 10000L);
+        sourceConfig.put("topic", kafkaTopicName);
+        sourceConfig.put("valueDeserializationClass", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    }
+
+    @Override
+    protected Map<String, GenericContainer<?>> newSourceService(String 
clusterName) {
+        this.kafkaContainer = new KafkaContainer()
+            .withEmbeddedZookeeper()
+            .withNetworkAliases(NAME)
+            .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                .withName(NAME)
+                .withHostName(clusterName + "-" + NAME));
+
+        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+        containers.put("kafka", kafkaContainer);
+        return containers;
+    }
+
+    @Override
+    protected void prepareSource() throws Exception {
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+            "localhost:2181",
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+        kafkaConsumer = new KafkaConsumer<>(
+            ImmutableMap.of(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                ConsumerConfig.GROUP_ID_CONFIG, "source-test-" + 
TestUtils.randomName(8),
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+            ),
+            new StringDeserializer(),
+            new StringDeserializer()
+        );
+        kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+        log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+    }
+
+    @Override
+    protected Map<String, String> produceSourceMessages(int numMessages) 
throws Exception{
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+                ImmutableMap.of(
+                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                        ProducerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString()
+                ),
+                new StringSerializer(),
+                new StringSerializer()
+        );
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+            String value = "value-" + i;
+            ProducerRecord<String, String> record = new ProducerRecord<>(
+                kafkaTopicName,
+                key,
+                value
+            );
+            kvs.put(key, value);
+            producer.send(record).get();
+        }
+
+        log.info("Successfully produce {} messages to kafka topic {}", 
kafkaTopicName);
+        return kvs;
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
new file mode 100644
index 0000000..98cb1f2
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import 
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing source.
+ */
+@Slf4j
+public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
+
+    @DataProvider(name = "Sources")
+    public static Object[][] getData() {
+        return new Object[][] {
+            { FunctionRuntimeType.PROCESS, new KafkaSourceTester() },
+            { FunctionRuntimeType.THREAD, new KafkaSourceTester() }
+        };
+    }
+
+    protected final SourceTester tester;
+
+    @Factory(dataProvider = "Sources")
+    PulsarIOSourceTest(FunctionRuntimeType functionRuntimeType, SourceTester 
tester) {
+        super(functionRuntimeType);
+        this.tester = tester;
+    }
+
+    @Override
+    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+                                                          
PulsarClusterSpecBuilder specBuilder) {
+        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+        externalServices.putAll(tester.newSourceService(clusterName));
+        return super.beforeSetupCluster(clusterName, specBuilder)
+            .externalServices(externalServices);
+    }
+
+    @Test
+    public void testSource() throws Exception {
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "test-source-connector-output-topic-" + 
randomName(8);
+        final String sourceName = "test-source-connector-name-" + 
randomName(8);
+        final int numMessages = 20;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopicName)
+            .subscriptionName("source-tester")
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        // prepare the testing environment for source
+        prepareSource();
+
+        // submit the source connector
+        submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
+
+        // get source info
+        getSourceInfoSuccess(tenant, namespace, sourceName);
+
+        // get source status
+        getSourceStatus(tenant, namespace, sourceName);
+
+        // produce messages
+        Map<String, String> kvs = tester.produceSourceMessages(numMessages);
+
+        // wait for source to process messages
+        waitForProcessingMessages(tenant, namespace, sourceName, numMessages);
+
+        // validate the source result
+        validateSourceResult(consumer, kvs);
+
+        // delete the source
+        deleteSource(tenant, namespace, sourceName);
+
+        // get source info (source should be deleted)
+        getSourceInfoNotFound(tenant, namespace, sourceName);
+    }
+
+    protected void prepareSource() throws Exception {
+        tester.prepareSource();
+    }
+
+    protected void submitSourceConnector(String tenant,
+                                       String namespace,
+                                       String sourceName,
+                                       String outputTopicName) throws 
Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "source", "create",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName,
+            "--source-type", tester.sourceType(),
+            "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+            "--destinationTopicName", outputTopicName
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("\"Created successfully\""),
+            result.getStdout());
+    }
+
+    protected void getSourceInfoSuccess(String tenant, String namespace, 
String sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get source info : {}", result.getStdout());
+        assertTrue(
+            result.getStdout().contains("\"builtin\": \"" + tester.sourceType 
+ "\""),
+            result.getStdout()
+        );
+    }
+
+    protected void getSourceStatus(String tenant, String namespace, String 
sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        while (true) {
+            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            log.info("Get source status : {}", result.getStdout());
+            if (result.getStdout().contains("\"running\": true")) {
+                return;
+            }
+            log.info("Backoff 1 second until the function is running");
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected void validateSourceResult(Consumer<String> consumer,
+                                        Map<String, String> kvs) throws 
Exception {
+        for (Map.Entry<String, String> kv : kvs.entrySet()) {
+            Message<String> msg = consumer.receive();
+            assertEquals(kv.getKey(), msg.getKey());
+            assertEquals(kv.getValue(), msg.getValue());
+        }
+    }
+
+    protected void waitForProcessingMessages(String tenant,
+                                             String namespace,
+                                             String sourceName,
+                                             int numMessages) throws Exception 
{
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            log.info("Get source status : {}", result.getStdout());
+            if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                return;
+            }
+            log.info("{} ms has elapsed but the source hasn't process {} 
messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected void deleteSource(String tenant, String namespace, String 
sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "source",
+            "delete",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("Delete source successfully"),
+            result.getStdout()
+        );
+        assertTrue(
+            result.getStderr().isEmpty(),
+            result.getStderr()
+        );
+    }
+
+    protected void getSourceInfoNotFound(String tenant, String namespace, 
String sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(result.getStderr().contains("Reason: Function " + 
sourceName + " doesn't exist"));
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
new file mode 100644
index 0000000..a28d086
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific source.
+ */
+public abstract class SourceTester {
+
+    protected final String sourceType;
+    protected final Map<String, Object> sourceConfig;
+
+    protected SourceTester(String sourceType) {
+        this.sourceType = sourceType;
+        this.sourceConfig = Maps.newHashMap();
+    }
+
+    protected abstract Map<String, GenericContainer<?>> 
newSourceService(String clusterName);
+
+    protected String sourceType() {
+        return sourceType;
+    }
+
+    protected Map<String, Object> sourceConfig() {
+        return sourceConfig;
+    }
+
+    protected abstract void prepareSource() throws Exception;
+
+    protected abstract Map<String, String> produceSourceMessages(int 
numMessages) throws Exception;
+
+}

Reply via email to