This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d6fb900 Add integration test for kafka sink (#2170)
d6fb900 is described below
commit d6fb900f3741603290f1b93fdb86fe8c1e8d70e5
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 17 15:07:44 2018 -0700
Add integration test for kafka sink (#2170)
*Motivation*
#2167 introduces a sink test for cassandra. this PR adds a test for kafka
sink.
*Changes*
- Add a KafkaSinkTester so `PulsarIOSinkTest` will test kafka sink
- Remove customized `KafkaContainer` and `KafkaProxyContainer` and use
testcontainers's KafkaContainer
---
pom.xml | 5 +
.../pulsar/tests/containers/KafkaContainer.java | 63 ----------
.../tests/containers/KafkaProxyContainer.java | 51 --------
.../pulsar/tests/topologies/PulsarCluster.java | 7 +-
tests/integration/semantics/pom.xml | 11 ++
.../tests/integration/io/CassandraSinkTester.java | 11 +-
.../tests/integration/io/KafkaSinkTester.java | 130 +++++++++++++++++++++
.../tests/integration/io/PulsarIOSinkTest.java | 13 ++-
.../pulsar/tests/integration/io/SinkTester.java | 6 +-
9 files changed, 169 insertions(+), 128 deletions(-)
diff --git a/pom.xml b/pom.xml
index a6d6453..263d362 100644
--- a/pom.xml
+++ b/pom.xml
@@ -800,6 +800,11 @@ flexible messaging model and an intuitive client
API.</description>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.arquillian.cube</groupId>
<artifactId>arquillian-cube-docker</artifactId>
<version>${arquillian-cube.version}</version>
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
deleted file mode 100644
index 83b5e42..0000000
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaContainer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaContainer<SelfT extends ChaosContainer<SelfT>> extends
ChaosContainer<SelfT> {
-
- public static final String NAME = "kafka";
- public static final int INTERNAL_PORT = 9092;
- public static final int PORT = 9093;
-
- public KafkaContainer(String clusterName) {
- super(clusterName, "confluentinc/cp-kafka:4.1.1");
- }
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withExposedPorts(INTERNAL_PORT, PORT)
- .withClasspathResourceMapping(
- "kafka-zookeeper.properties", "/zookeeper.properties",
- BindMode.READ_ONLY)
- .withCommand("sh", "-c", "zookeeper-server-start
/zookeeper.properties & /etc/confluent/docker/run")
- .withEnv("KAFKA_LISTENERS",
- "INTERNAL://kafka:" + INTERNAL_PORT + ",PLAINTEXT://" +
"0.0.0.0" + ":" + PORT)
- .withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181")
- .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "INTERNAL")
- .withEnv("KAFKA_BROKER_ID", "1")
- .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
- .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
- .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
-}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
deleted file mode 100644
index 052db7e..0000000
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/KafkaProxyContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.containers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.testcontainers.containers.SocatContainer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-/**
- * Cassandra Container.
- */
-@Slf4j
-public class KafkaProxyContainer extends SocatContainer {
-
- public static final String NAME = "kafka-proxy";
-
- private final String clusterName;
-
- public KafkaProxyContainer(String clusterName) {
- super();
- this.clusterName = clusterName;
- }
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withTarget(KafkaContainer.PORT, KafkaContainer.NAME)
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
-}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 0f0bcff..331d7e2 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -194,9 +194,10 @@ public class PulsarCluster {
final Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
if (null != externalServices) {
externalServices.entrySet().parallelStream().forEach(service -> {
- service.getValue().withNetwork(network);
- service.getValue().withNetworkAliases(service.getKey());
- service.getValue().start();
+ GenericContainer<?> serviceContainer = service.getValue();
+ serviceContainer.withNetwork(network);
+ serviceContainer.withNetworkAliases(service.getKey());
+ serviceContainer.start();
log.info("Successfully start external service {}.",
service.getKey());
});
}
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index b98237a..09258db 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -58,5 +58,16 @@
<artifactId>cassandra-driver-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 483b564..6bbfbe3 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,17 +25,20 @@ import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.containers.CassandraContainer;
import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
/**
* A tester for testing cassandra sink.
*/
@Slf4j
-public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+public class CassandraSinkTester extends SinkTester {
private static final String ROOTS = "cassandra";
private static final String KEY = "key";
@@ -64,9 +67,11 @@ public class CassandraSinkTester extends
SinkTester<CassandraContainer> {
}
@Override
- protected CassandraContainer newSinkService(String clusterName) {
+ protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
this.cassandraCluster = new CassandraContainer(clusterName);
- return this.cassandraCluster;
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put(CassandraContainer.NAME, cassandraCluster);
+ return containers;
}
@Override
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
new file mode 100644
index 0000000..6659531
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.collections.Maps;
+
+/**
+ * A tester for testing kafka sink.
+ */
+@Slf4j
+public class KafkaSinkTester extends SinkTester {
+
+ private static final String NAME = "kafka";
+
+ private final String kafkaTopicName;
+
+ private KafkaContainer kafkaContainer;
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ public KafkaSinkTester() {
+ super(NAME);
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.kafkaTopicName = "kafka_sink_topic_" + suffix;
+
+ sinkConfig.put("bootstrapServers", NAME + ":9092");
+ sinkConfig.put("acks", "all");
+ sinkConfig.put("batchSize", 1L);
+ sinkConfig.put("maxRequestSize", 1048576L);
+ sinkConfig.put("topic", kafkaTopicName);
+ }
+
+ @Override
+ protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
+ this.kafkaContainer = new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(NAME)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(NAME)
+ .withHostName(clusterName + "-" + NAME));
+
+ Map<String, GenericContainer<?>> containers = Maps.newHashMap();
+ containers.put("kafka", kafkaContainer);
+ return containers;
+ }
+
+ @Override
+ protected void prepareSink() throws Exception {
+ ExecResult execResult = kafkaContainer.execInContainer(
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ "localhost:2181",
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
+ assertTrue(
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
+
+ kafkaConsumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" +
TestUtils.randomName(8),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
+ log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
+ }
+
+ @Override
+ protected void validateSinkResult(Map<String, String> kvs) {
+ Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
+ while (kvIter.hasNext()) {
+ ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
+ log.info("Received {} records from kafka topic {}",
+ records.count(), kafkaTopicName);
+ if (records.isEmpty()) {
+ continue;
+ }
+
+ Iterator<ConsumerRecord<String, String>> recordsIter =
records.iterator();
+ while (recordsIter.hasNext() && kvIter.hasNext()) {
+ ConsumerRecord<String, String> consumerRecord =
recordsIter.next();
+ Map.Entry<String, String> expectedRecord = kvIter.next();
+ assertEquals(expectedRecord.getKey(), consumerRecord.key());
+ assertEquals(expectedRecord.getValue(),
consumerRecord.value());
+ }
+ }
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index 52719bf..464a716 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertTrue;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -52,7 +53,9 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase
{
public static Object[][] getData() {
return new Object[][] {
{ FunctionRuntimeType.PROCESS, new CassandraSinkTester() },
- { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+ { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
+ { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
+ { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
};
}
@@ -68,7 +71,7 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase
{
protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
PulsarClusterSpecBuilder specBuilder) {
Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
- externalServices.put(tester.sinkType,
tester.newSinkService(clusterName));
+ externalServices.putAll(tester.newSinkService(clusterName));
return super.beforeSetupCluster(clusterName, specBuilder)
.externalServices(externalServices);
}
@@ -109,7 +112,7 @@ public class PulsarIOSinkTest extends
PulsarFunctionsTestBase {
getSinkInfoNotFound(tenant, namespace, sinkName);
}
- protected void prepareSink() {
+ protected void prepareSink() throws Exception {
tester.prepareSink();
}
@@ -146,7 +149,7 @@ public class PulsarIOSinkTest extends
PulsarFunctionsTestBase {
ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink info : {}", result.getStdout());
assertTrue(
- result.getStdout().contains("\"builtin\": \"cassandra\""),
+ result.getStdout().contains("\"builtin\": \"" + tester.sinkType +
"\""),
result.getStdout()
);
}
@@ -181,7 +184,7 @@ public class PulsarIOSinkTest extends
PulsarFunctionsTestBase {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
- Map<String, String> kvs = Maps.newHashMap();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
String value = "value-" + i;
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index dbc5884..75434be 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -25,7 +25,7 @@ import org.testng.collections.Maps;
/**
* A tester used for testing a specific sink.
*/
-public abstract class SinkTester<SINK_SERVICE_CONTAINER extends
GenericContainer> {
+public abstract class SinkTester {
protected final String sinkType;
protected final Map<String, Object> sinkConfig;
@@ -35,7 +35,7 @@ public abstract class SinkTester<SINK_SERVICE_CONTAINER
extends GenericContainer
this.sinkConfig = Maps.newHashMap();
}
- protected abstract SINK_SERVICE_CONTAINER newSinkService(String
clusterName);
+ protected abstract Map<String, GenericContainer<?>> newSinkService(String
clusterName);
protected String sinkType() {
return sinkType;
@@ -45,7 +45,7 @@ public abstract class SinkTester<SINK_SERVICE_CONTAINER
extends GenericContainer
return sinkConfig;
}
- protected abstract void prepareSink();
+ protected abstract void prepareSink() throws Exception;
protected abstract void validateSinkResult(Map<String, String> kvs);