This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fb900  Add integration test for kafka sink (#2170)
d6fb900 is described below

commit d6fb900f3741603290f1b93fdb86fe8c1e8d70e5
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 17 15:07:44 2018 -0700

    Add integration test for kafka sink (#2170)
    
    *Motivation*
    
     #2167 introduces a sink test for cassandra. this PR adds a test for kafka 
sink.
    
    *Changes*
    
    - Add a KafkaSinkTester so `PulsarIOSinkTest` will test kafka sink
    - Remove customized `KafkaContainer` and `KafkaProxyContainer` and use 
testcontainers's KafkaContainer
---
 pom.xml                                            |   5 +
 .../pulsar/tests/containers/KafkaContainer.java    |  63 ----------
 .../tests/containers/KafkaProxyContainer.java      |  51 --------
 .../pulsar/tests/topologies/PulsarCluster.java     |   7 +-
 tests/integration/semantics/pom.xml                |  11 ++
 .../tests/integration/io/CassandraSinkTester.java  |  11 +-
 .../tests/integration/io/KafkaSinkTester.java      | 130 +++++++++++++++++++++
 .../tests/integration/io/PulsarIOSinkTest.java     |  13 ++-
 .../pulsar/tests/integration/io/SinkTester.java    |   6 +-
 9 files changed, 169 insertions(+), 128 deletions(-)

diff --git a/pom.xml b/pom.xml
index a6d6453..263d362 100644
--- a/pom.xml
+++ b/pom.xml
@@ -800,6 +800,11 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${testcontainers.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>kafka</artifactId>
+        <version>${testcontainers.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.arquillian.cube</groupId>
         <artifactId>arquillian-cube-docker</artifactId>
         <version>${arquillian-cube.version}</version>
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
deleted file mode 100644
index 83b5e42..0000000
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends 
ChaosContainer<SelfT> {
-
-    public static final String NAME = "kafka";
-    public static final int INTERNAL_PORT = 9092;
-    public static final int PORT = 9093;
-
-    public KafkaContainer(String clusterName) {
-        super(clusterName, "confluentinc/cp-kafka:4.1.1");
-    }
-
-    @Override
-    protected void configure() {
-        super.configure();
-        this.withNetworkAliases(NAME)
-            .withExposedPorts(INTERNAL_PORT, PORT)
-            .withClasspathResourceMapping(
-                "kafka-zookeeper.properties", "/zookeeper.properties",
-                BindMode.READ_ONLY)
-            .withCommand("sh", "-c", "zookeeper-server-start 
/zookeeper.properties & /etc/confluent/docker/run")
-            .withEnv("KAFKA_LISTENERS",
-                "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" + 
"0.0.0.0" + ":" + PORT)
-            .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
-            .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-            .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
-            .withEnv("KAFKA_BROKER_ID", "1")
-            .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
-            .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
-            .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
-            .withCreateContainerCmdModifier(createContainerCmd -> {
-                createContainerCmd.withHostName(NAME);
-                createContainerCmd.withName(clusterName + "-" + NAME);
-            })
-            .waitingFor(new HostPortWaitStrategy());
-    }
-}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
deleted file mode 100644
index 052db7e..0000000
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.SocatContainer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaProxyContainer extends SocatContainer {
-
-    public static final String NAME = "kafka-proxy";
-
-    private final String clusterName;
-
-    public KafkaProxyContainer(String clusterName) {
-        super();
-        this.clusterName = clusterName;
-    }
-
-    @Override
-    protected void configure() {
-        super.configure();
-        this.withNetworkAliases(NAME)
-            .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
-            .withCreateContainerCmdModifier(createContainerCmd -> {
-                createContainerCmd.withHostName(NAME);
-                createContainerCmd.withName(clusterName + "-" + NAME);
-            })
-            .waitingFor(new HostPortWaitStrategy());
-    }
-}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 0f0bcff..331d7e2 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -194,9 +194,10 @@ public class PulsarCluster {
         final Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
         if (null != externalServices) {
             externalServices.entrySet().parallelStream().forEach(service -> {
-                service.getValue().withNetwork(network);
-                service.getValue().withNetworkAliases(service.getKey());
-                service.getValue().start();
+                GenericContainer<?> serviceContainer = service.getValue();
+                serviceContainer.withNetwork(network);
+                serviceContainer.withNetworkAliases(service.getKey());
+                serviceContainer.start();
                 log.info("Successfully start external service {}.", 
service.getKey());
             });
         }
diff --git a/tests/integration/semantics/pom.xml 
b/tests/integration/semantics/pom.xml
index b98237a..09258db 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -58,5 +58,16 @@
       <artifactId>cassandra-driver-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-kafka</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 483b564..6bbfbe3 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,17 +25,20 @@ import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.containers.CassandraContainer;
 import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
 
 /**
  * A tester for testing cassandra sink.
  */
 @Slf4j
-public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+public class CassandraSinkTester extends SinkTester {
 
     private static final String ROOTS = "cassandra";
     private static final String KEY = "key";
@@ -64,9 +67,11 @@ public class CassandraSinkTester extends 
SinkTester<CassandraContainer> {
     }
 
     @Override
-    protected CassandraContainer newSinkService(String clusterName) {
+    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
         this.cassandraCluster = new CassandraContainer(clusterName);
-        return this.cassandraCluster;
+        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+        containers.put(CassandraContainer.NAME, cassandraCluster);
+        return containers;
     }
 
     @Override
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
new file mode 100644
index 0000000..6659531
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka sink.
+ */
+@Slf4j
+public class KafkaSinkTester extends SinkTester {
+
+    private static final String NAME = "kafka";
+
+    private final String kafkaTopicName;
+
+    private KafkaContainer kafkaContainer;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    public KafkaSinkTester() {
+        super(NAME);
+        String suffix = TestUtils.randomName(8) + "_" + 
System.currentTimeMillis();
+        this.kafkaTopicName = "kafka_sink_topic_" + suffix;
+
+        sinkConfig.put("bootstrapServers", NAME + ":9092");
+        sinkConfig.put("acks", "all");
+        sinkConfig.put("batchSize", 1L);
+        sinkConfig.put("maxRequestSize", 1048576L);
+        sinkConfig.put("topic", kafkaTopicName);
+    }
+
+    @Override
+    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
+        this.kafkaContainer = new KafkaContainer()
+            .withEmbeddedZookeeper()
+            .withNetworkAliases(NAME)
+            .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                .withName(NAME)
+                .withHostName(clusterName + "-" + NAME));
+
+        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+        containers.put("kafka", kafkaContainer);
+        return containers;
+    }
+
+    @Override
+    protected void prepareSink() throws Exception {
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+            "localhost:2181",
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+        kafkaConsumer = new KafkaConsumer<>(
+            ImmutableMap.of(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + 
TestUtils.randomName(8),
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+            ),
+            new StringDeserializer(),
+            new StringDeserializer()
+        );
+        kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+        log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+    }
+
+    @Override
+    protected void validateSinkResult(Map<String, String> kvs) {
+        Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
+        while (kvIter.hasNext()) {
+            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
+            log.info("Received {} records from kafka topic {}",
+                records.count(), kafkaTopicName);
+            if (records.isEmpty()) {
+                continue;
+            }
+
+            Iterator<ConsumerRecord<String, String>> recordsIter = 
records.iterator();
+            while (recordsIter.hasNext() && kvIter.hasNext()) {
+                ConsumerRecord<String, String> consumerRecord = 
recordsIter.next();
+                Map.Entry<String, String> expectedRecord = kvIter.next();
+                assertEquals(expectedRecord.getKey(), consumerRecord.key());
+                assertEquals(expectedRecord.getValue(), 
consumerRecord.value());
+            }
+        }
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index 52719bf..464a716 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -52,7 +53,9 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase 
{
     public static Object[][] getData() {
         return new Object[][] {
             { FunctionRuntimeType.PROCESS,  new CassandraSinkTester() },
-            { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+            { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
+            { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
+            { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
         };
     }
 
@@ -68,7 +71,7 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase 
{
     protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
                                                           
PulsarClusterSpecBuilder specBuilder) {
         Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-        externalServices.put(tester.sinkType, 
tester.newSinkService(clusterName));
+        externalServices.putAll(tester.newSinkService(clusterName));
         return super.beforeSetupCluster(clusterName, specBuilder)
             .externalServices(externalServices);
     }
@@ -109,7 +112,7 @@ public class PulsarIOSinkTest extends 
PulsarFunctionsTestBase {
         getSinkInfoNotFound(tenant, namespace, sinkName);
     }
 
-    protected void prepareSink() {
+    protected void prepareSink() throws Exception {
         tester.prepareSink();
     }
 
@@ -146,7 +149,7 @@ public class PulsarIOSinkTest extends 
PulsarFunctionsTestBase {
         ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
         assertTrue(
-            result.getStdout().contains("\"builtin\": \"cassandra\""),
+            result.getStdout().contains("\"builtin\": \"" + tester.sinkType + 
"\""),
             result.getStdout()
         );
     }
@@ -181,7 +184,7 @@ public class PulsarIOSinkTest extends 
PulsarFunctionsTestBase {
         Producer<String> producer = client.newProducer(Schema.STRING)
             .topic(inputTopicName)
             .create();
-        Map<String, String> kvs = Maps.newHashMap();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
             String value = "value-" + i;
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index dbc5884..75434be 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -25,7 +25,7 @@ import org.testng.collections.Maps;
 /**
  * A tester used for testing a specific sink.
  */
-public abstract class SinkTester<SINK_SERVICE_CONTAINER extends 
GenericContainer> {
+public abstract class SinkTester {
 
     protected final String sinkType;
     protected final Map<String, Object> sinkConfig;
@@ -35,7 +35,7 @@ public abstract class SinkTester<SINK_SERVICE_CONTAINER 
extends GenericContainer
         this.sinkConfig = Maps.newHashMap();
     }
 
-    protected abstract SINK_SERVICE_CONTAINER newSinkService(String 
clusterName);
+    protected abstract Map<String, GenericContainer<?>> newSinkService(String 
clusterName);
 
     protected String sinkType() {
         return sinkType;
@@ -45,7 +45,7 @@ public abstract class SinkTester<SINK_SERVICE_CONTAINER 
extends GenericContainer
         return sinkConfig;
     }
 
-    protected abstract void prepareSink();
+    protected abstract void prepareSink() throws Exception;
 
     protected abstract void validateSinkResult(Map<String, String> kvs);
 

Reply via email to