sijie closed pull request #2170: Add integration test for kafka sink
URL: https://github.com/apache/incubator-pulsar/pull/2170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index a6d645379b..263d3624d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -799,6 +799,11 @@ flexible messaging model and an intuitive client 
API.</description>
         <artifactId>testcontainers</artifactId>
         <version>${testcontainers.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>kafka</artifactId>
+        <version>${testcontainers.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.arquillian.cube</groupId>
         <artifactId>arquillian-cube-docker</artifactId>
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
deleted file mode 100644
index 83b5e42ba0..0000000000
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends 
ChaosContainer<SelfT> {
-
-    public static final String NAME = "kafka";
-    public static final int INTERNAL_PORT = 9092;
-    public static final int PORT = 9093;
-
-    public KafkaContainer(String clusterName) {
-        super(clusterName, "confluentinc/cp-kafka:4.1.1");
-    }
-
-    @Override
-    protected void configure() {
-        super.configure();
-        this.withNetworkAliases(NAME)
-            .withExposedPorts(INTERNAL_PORT, PORT)
-            .withClasspathResourceMapping(
-                "kafka-zookeeper.properties", "/zookeeper.properties",
-                BindMode.READ_ONLY)
-            .withCommand("sh", "-c", "zookeeper-server-start 
/zookeeper.properties & /etc/confluent/docker/run")
-            .withEnv("KAFKA_LISTENERS",
-                "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" + 
"0.0.0.0" + ":" + PORT)
-            .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
-            .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-            .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
-            .withEnv("KAFKA_BROKER_ID", "1")
-            .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
-            .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
-            .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
-            .withCreateContainerCmdModifier(createContainerCmd -> {
-                createContainerCmd.withHostName(NAME);
-                createContainerCmd.withName(clusterName + "-" + NAME);
-            })
-            .waitingFor(new HostPortWaitStrategy());
-    }
-}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
deleted file mode 100644
index 052db7e8b0..0000000000
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.SocatContainer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaProxyContainer extends SocatContainer {
-
-    public static final String NAME = "kafka-proxy";
-
-    private final String clusterName;
-
-    public KafkaProxyContainer(String clusterName) {
-        super();
-        this.clusterName = clusterName;
-    }
-
-    @Override
-    protected void configure() {
-        super.configure();
-        this.withNetworkAliases(NAME)
-            .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
-            .withCreateContainerCmdModifier(createContainerCmd -> {
-                createContainerCmd.withHostName(NAME);
-                createContainerCmd.withName(clusterName + "-" + NAME);
-            })
-            .waitingFor(new HostPortWaitStrategy());
-    }
-}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 0f0bcffec5..331d7e2ddf 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -194,9 +194,10 @@ public void start() throws Exception {
         final Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
         if (null != externalServices) {
             externalServices.entrySet().parallelStream().forEach(service -> {
-                service.getValue().withNetwork(network);
-                service.getValue().withNetworkAliases(service.getKey());
-                service.getValue().start();
+                GenericContainer<?> serviceContainer = service.getValue();
+                serviceContainer.withNetwork(network);
+                serviceContainer.withNetworkAliases(service.getKey());
+                serviceContainer.start();
                 log.info("Successfully start external service {}.", 
service.getKey());
             });
         }
diff --git a/tests/integration/semantics/pom.xml 
b/tests/integration/semantics/pom.xml
index b98237ada4..09258dba21 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -58,5 +58,16 @@
       <artifactId>cassandra-driver-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-kafka</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 483b564d40..6bbfbe3a25 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,17 +25,20 @@
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.containers.CassandraContainer;
 import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
 
 /**
  * A tester for testing cassandra sink.
  */
 @Slf4j
-public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+public class CassandraSinkTester extends SinkTester {
 
     private static final String ROOTS = "cassandra";
     private static final String KEY = "key";
@@ -64,9 +67,11 @@ public CassandraSinkTester() {
     }
 
     @Override
-    protected CassandraContainer newSinkService(String clusterName) {
+    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
         this.cassandraCluster = new CassandraContainer(clusterName);
-        return this.cassandraCluster;
+        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+        containers.put(CassandraContainer.NAME, cassandraCluster);
+        return containers;
     }
 
     @Override
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
new file mode 100644
index 0000000000..66595316c7
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka sink.
+ */
+@Slf4j
+public class KafkaSinkTester extends SinkTester {
+
+    private static final String NAME = "kafka";
+
+    private final String kafkaTopicName;
+
+    private KafkaContainer kafkaContainer;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    public KafkaSinkTester() {
+        super(NAME);
+        String suffix = TestUtils.randomName(8) + "_" + 
System.currentTimeMillis();
+        this.kafkaTopicName = "kafka_sink_topic_" + suffix;
+
+        sinkConfig.put("bootstrapServers", NAME + ":9092");
+        sinkConfig.put("acks", "all");
+        sinkConfig.put("batchSize", 1L);
+        sinkConfig.put("maxRequestSize", 1048576L);
+        sinkConfig.put("topic", kafkaTopicName);
+    }
+
+    @Override
+    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
+        this.kafkaContainer = new KafkaContainer()
+            .withEmbeddedZookeeper()
+            .withNetworkAliases(NAME)
+            .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                .withName(NAME)
+                .withHostName(clusterName + "-" + NAME));
+
+        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+        containers.put("kafka", kafkaContainer);
+        return containers;
+    }
+
+    @Override
+    protected void prepareSink() throws Exception {
+        ExecResult execResult = kafkaContainer.execInContainer(
+            "/usr/bin/kafka-topics",
+            "--create",
+            "--zookeeper",
+            "localhost:2181",
+            "--partitions",
+            "1",
+            "--replication-factor",
+            "1",
+            "--topic",
+            kafkaTopicName);
+        assertTrue(
+            execResult.getStdout().contains("Created topic"),
+            execResult.getStdout());
+
+        kafkaConsumer = new KafkaConsumer<>(
+            ImmutableMap.of(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + 
TestUtils.randomName(8),
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+            ),
+            new StringDeserializer(),
+            new StringDeserializer()
+        );
+        kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+        log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+    }
+
+    @Override
+    protected void validateSinkResult(Map<String, String> kvs) {
+        Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
+        while (kvIter.hasNext()) {
+            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
+            log.info("Received {} records from kafka topic {}",
+                records.count(), kafkaTopicName);
+            if (records.isEmpty()) {
+                continue;
+            }
+
+            Iterator<ConsumerRecord<String, String>> recordsIter = 
records.iterator();
+            while (recordsIter.hasNext() && kvIter.hasNext()) {
+                ConsumerRecord<String, String> consumerRecord = 
recordsIter.next();
+                Map.Entry<String, String> expectedRecord = kvIter.next();
+                assertEquals(expectedRecord.getKey(), consumerRecord.key());
+                assertEquals(expectedRecord.getValue(), 
consumerRecord.value());
+            }
+        }
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index 52719bff70..464a716fcd 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -22,6 +22,7 @@
 
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -52,7 +53,9 @@
     public static Object[][] getData() {
         return new Object[][] {
             { FunctionRuntimeType.PROCESS,  new CassandraSinkTester() },
-            { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+            { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
+            { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
+            { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
         };
     }
 
@@ -68,7 +71,7 @@
     protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
                                                           
PulsarClusterSpecBuilder specBuilder) {
         Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-        externalServices.put(tester.sinkType, 
tester.newSinkService(clusterName));
+        externalServices.putAll(tester.newSinkService(clusterName));
         return super.beforeSetupCluster(clusterName, specBuilder)
             .externalServices(externalServices);
     }
@@ -109,7 +112,7 @@ public void testSink() throws Exception {
         getSinkInfoNotFound(tenant, namespace, sinkName);
     }
 
-    protected void prepareSink() {
+    protected void prepareSink() throws Exception {
         tester.prepareSink();
     }
 
@@ -146,7 +149,7 @@ protected void getSinkInfoSuccess(String tenant, String 
namespace, String sinkNa
         ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
         assertTrue(
-            result.getStdout().contains("\"builtin\": \"cassandra\""),
+            result.getStdout().contains("\"builtin\": \"" + tester.sinkType + 
"\""),
             result.getStdout()
         );
     }
@@ -181,7 +184,7 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
         Producer<String> producer = client.newProducer(Schema.STRING)
             .topic(inputTopicName)
             .create();
-        Map<String, String> kvs = Maps.newHashMap();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
             String value = "value-" + i;
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index dbc5884881..75434be529 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -25,7 +25,7 @@
 /**
  * A tester used for testing a specific sink.
  */
-public abstract class SinkTester<SINK_SERVICE_CONTAINER extends 
GenericContainer> {
+public abstract class SinkTester {
 
     protected final String sinkType;
     protected final Map<String, Object> sinkConfig;
@@ -35,7 +35,7 @@ protected SinkTester(String sinkType) {
         this.sinkConfig = Maps.newHashMap();
     }
 
-    protected abstract SINK_SERVICE_CONTAINER newSinkService(String 
clusterName);
+    protected abstract Map<String, GenericContainer<?>> newSinkService(String 
clusterName);
 
     protected String sinkType() {
         return sinkType;
@@ -45,7 +45,7 @@ protected String sinkType() {
         return sinkConfig;
     }
 
-    protected abstract void prepareSink();
+    protected abstract void prepareSink() throws Exception;
 
     protected abstract void validateSinkResult(Map<String, String> kvs);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to