This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 70ac1bd Clean up KafkaPartitionLevelConsumerTest (#6803)
70ac1bd is described below
commit 70ac1bd99a64a951f588a9513e95441f32fea2cf
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 15 18:29:17 2021 -0700
Clean up KafkaPartitionLevelConsumerTest (#6803)
Simplify the logic of creating the kafka cluster in `MiniKafkaCluster`
Put `localhost` as the `host.name` to be in sync with the quick-start
---
.github/workflows/scripts/.pinot_test.sh | 4 +
.../kafka20/KafkaPartitionLevelConsumerTest.java | 63 +++++------
.../stream/kafka20/utils/EmbeddedZooKeeper.java | 39 +++----
.../stream/kafka20/utils/MiniKafkaCluster.java | 125 ++++++---------------
4 files changed, 79 insertions(+), 152 deletions(-)
diff --git a/.github/workflows/scripts/.pinot_test.sh
b/.github/workflows/scripts/.pinot_test.sh
index 1a745b3..1e7ac14 100755
--- a/.github/workflows/scripts/.pinot_test.sh
+++ b/.github/workflows/scripts/.pinot_test.sh
@@ -21,6 +21,10 @@
# Java version
java -version
+# Check network
+ifconfig
+netstat -i
+
# Only run integration tests if needed
if [ "$RUN_INTEGRATION_TESTS" != false ]; then
mvn test -B -P github-actions,integration-tests-only && exit 0 || exit 1
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 90e72e2..eebffcb 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -35,9 +35,6 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -48,26 +45,22 @@ import org.testng.annotations.Test;
* Tests for the KafkaPartitionLevelConsumer.
*/
public class KafkaPartitionLevelConsumerTest {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
private static final long STABILIZE_SLEEP_DELAYS = 3000;
private static final String TEST_TOPIC_1 = "foo";
private static final String TEST_TOPIC_2 = "bar";
private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
- private MiniKafkaCluster kafkaCluster;
- private String brokerAddress;
+ private MiniKafkaCluster _kafkaCluster;
+ private String _kafkaBrokerAddress;
@BeforeClass
- public void setup()
+ public void setUp()
throws Exception {
- kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
- LOGGER.info("Trying to start MiniKafkaCluster");
- kafkaCluster.start();
- brokerAddress = NetUtils.getHostAddress() + ":" +
kafkaCluster.getKafkaServerPort(0);
- LOGGER.info("Kafka Broker Address is {}", brokerAddress);
- kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
- kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+ _kafkaCluster = new MiniKafkaCluster("0");
+ _kafkaCluster.start();
+ _kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
+ _kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+ _kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
Thread.sleep(STABILIZE_SLEEP_DELAYS);
produceMsgToKafka();
Thread.sleep(STABILIZE_SLEEP_DELAYS);
@@ -75,30 +68,26 @@ public class KafkaPartitionLevelConsumerTest {
private void produceMsgToKafka() {
Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, this.getClass().getName());
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
- props.put(ProducerConfig.ACKS_CONFIG, "1");
- KafkaProducer p = new KafkaProducer<>(props);
- for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
- p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
- p.flush();
- // TEST_TOPIC_2 has 2 partitions
- p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
- p.flush();
- p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
- p.flush();
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+ for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
+ producer.send(new ProducerRecord<>(TEST_TOPIC_1, "sample_msg_" + i));
+ // TEST_TOPIC_2 has 2 partitions
+ producer.send(new ProducerRecord<>(TEST_TOPIC_2, "sample_msg_" + i));
+ producer.send(new ProducerRecord<>(TEST_TOPIC_2, "sample_msg_" + i));
+ }
}
- p.close();
}
@AfterClass
- public void shutDown()
+ public void tearDown()
throws Exception {
- kafkaCluster.deleteTopic(TEST_TOPIC_1);
- kafkaCluster.deleteTopic(TEST_TOPIC_2);
- kafkaCluster.close();
+ _kafkaCluster.deleteTopic(TEST_TOPIC_1);
+ _kafkaCluster.deleteTopic(TEST_TOPIC_2);
+ _kafkaCluster.close();
}
@Test
@@ -106,7 +95,7 @@ public class KafkaPartitionLevelConsumerTest {
throws Exception {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
- String streamKafkaBrokerList = brokerAddress;
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";
@@ -150,7 +139,7 @@ public class KafkaPartitionLevelConsumerTest {
@Test
public void testGetPartitionCount() {
String streamType = "kafka";
- String streamKafkaBrokerList = brokerAddress;
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";
@@ -185,7 +174,7 @@ public class KafkaPartitionLevelConsumerTest {
throws Exception {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
- String streamKafkaBrokerList = brokerAddress;
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";
@@ -215,7 +204,7 @@ public class KafkaPartitionLevelConsumerTest {
private void testFetchOffsets(String topic)
throws Exception {
String streamType = "kafka";
- String streamKafkaBrokerList = brokerAddress;
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";
@@ -250,7 +239,7 @@ public class KafkaPartitionLevelConsumerTest {
private void testConsumer(String topic)
throws TimeoutException {
String streamType = "kafka";
- String streamKafkaBrokerList = brokerAddress;
+ String streamKafkaBrokerList = _kafkaBrokerAddress;
String streamKafkaConsumerType = "simple";
String clientId = "clientId";
String tableNameWithType = "tableName_REALTIME";
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
index e66345d..d83cf63 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
@@ -21,49 +21,36 @@ package org.apache.pinot.plugin.stream.kafka20.utils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.Inet4Address;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.nio.file.Files;
-import java.util.Enumeration;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.utils.NetUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
public class EmbeddedZooKeeper implements Closeable {
-
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"EmbeddedZooKeeper");
private static final int TICK_TIME = 500;
- private final NIOServerCnxnFactory factory;
- private final ZooKeeperServer zookeeper;
- private final File tmpDir;
- private final int port;
+
+ private final NIOServerCnxnFactory _factory;
+ private final String _zkAddress;
EmbeddedZooKeeper()
throws IOException, InterruptedException {
- this.tmpDir = Files.createTempDirectory(null).toFile();
- this.factory = new NIOServerCnxnFactory();
- this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new
File(tmpDir, "log"), TICK_TIME);
- InetSocketAddress addr = new InetSocketAddress(NetUtils.getHostAddress(),
0);
- factory.configure(addr, 0);
- factory.startup(zookeeper);
- this.port = zookeeper.getClientPort();
+ _factory = new NIOServerCnxnFactory();
+ ZooKeeperServer zkServer = new ZooKeeperServer(new File(TEMP_DIR, "data"),
new File(TEMP_DIR, "log"), TICK_TIME);
+ _factory.configure(new InetSocketAddress("localhost", 0), 0);
+ _factory.startup(zkServer);
+ _zkAddress = "localhost:" + zkServer.getClientPort();
}
- public int getPort() {
- return port;
+ public String getZkAddress() {
+ return _zkAddress;
}
@Override
public void close()
throws IOException {
- zookeeper.shutdown();
- factory.shutdown();
- FileUtils.deleteDirectory(tmpDir);
+ _factory.shutdown();
+ FileUtils.deleteDirectory(TEMP_DIR);
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
index 98bbbb4..4e82248 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
@@ -19,67 +19,47 @@
package org.apache.pinot.plugin.stream.kafka20.utils;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.utils.Time;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.collection.JavaConverters;
import scala.collection.Seq;
public final class MiniKafkaCluster implements Closeable {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"MiniKafkaCluster");
- private static final Logger LOGGER =
LoggerFactory.getLogger(MiniKafkaCluster.class);
- private final EmbeddedZooKeeper zkServer;
- private final List<KafkaServer> kafkaServer;
- private final List<Integer> kafkaPorts;
- private final Path tempDir;
- private final AdminClient adminClient;
- private final String zkUrl;
+ private final EmbeddedZooKeeper _zkServer;
+ private final KafkaServer _kafkaServer;
+ private final String _kafkaServerAddress;
+ private final AdminClient _adminClient;
@SuppressWarnings({"rawtypes", "unchecked"})
- private MiniKafkaCluster(List<String> brokerIds)
+ public MiniKafkaCluster(String brokerId)
throws IOException, InterruptedException {
- this.zkServer = new EmbeddedZooKeeper();
- this.zkUrl = NetUtils.getHostAddress() + ":" + zkServer.getPort();
- LOGGER.info("MiniKafkaCluster Zookeeper Url is {}", zkUrl);
- this.tempDir =
Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")),
"mini-kafka-cluster");
- this.kafkaServer = new ArrayList<>();
- this.kafkaPorts = new ArrayList<>();
- for (String id : brokerIds) {
- int port = getAvailablePort();
- LOGGER.info("Generate broker id = {}, port = {}", id, port);
- KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
- Seq seq =
-
scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
- kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
- kafkaPorts.add(port);
- }
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
NetUtils.getHostAddress() + ":" + getKafkaServerPort(0));
- adminClient = AdminClient.create(props);
+ _zkServer = new EmbeddedZooKeeper();
+ int kafkaServerPort = getAvailablePort();
+ KafkaConfig kafkaBrokerConfig = new
KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort));
+ Seq seq =
JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
+ _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM,
Option.empty(), seq);
+ _kafkaServerAddress = "localhost:" + kafkaServerPort;
+ Properties kafkaClientConfig = new Properties();
+ kafkaClientConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
_kafkaServerAddress);
+ _adminClient = AdminClient.create(kafkaClientConfig);
}
- static int getAvailablePort() {
+ private static int getAvailablePort() {
try {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
@@ -89,17 +69,16 @@ public final class MiniKafkaCluster implements Closeable {
}
}
- private Properties createBrokerConfig(String nodeId, int port)
- throws IOException {
+ private Properties createBrokerConfig(String brokerId, int port) {
Properties props = new Properties();
- props.put("broker.id", nodeId);
+ props.put("broker.id", brokerId);
// We need to explicitly set the network interface we want to let Kafka
bind to.
// By default, it will bind to all the network interfaces, which might not
be accessible always
// in a container based environment.
- props.put("host.name", NetUtils.getHostAddress());
+ props.put("host.name", "localhost");
props.put("port", Integer.toString(port));
- props.put("log.dir", Files.createTempDirectory(tempDir,
"broker-").toAbsolutePath().toString());
- props.put("zookeeper.connect", zkUrl);
+ props.put("log.dir", new File(TEMP_DIR, "log").getPath());
+ props.put("zookeeper.connect", _zkServer.getZkAddress());
props.put("replica.socket.timeout.ms", "1500");
props.put("controller.socket.timeout.ms", "1500");
props.put("controlled.shutdown.enable", "true");
@@ -112,61 +91,29 @@ public final class MiniKafkaCluster implements Closeable {
}
public void start() {
- for (KafkaServer s : kafkaServer) {
- s.startup();
- }
+ _kafkaServer.startup();
}
@Override
public void close()
throws IOException {
- adminClient.close();
- for (KafkaServer s : kafkaServer) {
- s.shutdown();
- }
- this.zkServer.close();
- FileUtils.deleteDirectory(tempDir.toFile());
+ _kafkaServer.shutdown();
+ _zkServer.close();
+ FileUtils.deleteDirectory(TEMP_DIR);
}
- public int getKafkaServerPort(int index) {
- return kafkaPorts.get(index);
+ public String getKafkaServerAddress() {
+ return _kafkaServerAddress;
}
- public boolean createTopic(String topicName, int numPartitions, int
replicationFactor) {
+ public void createTopic(String topicName, int numPartitions, int
replicationFactor)
+ throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short)
replicationFactor);
- CreateTopicsResult createTopicsResult =
this.adminClient.createTopics(Arrays.asList(newTopic));
- try {
- createTopicsResult.all().get();
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Failed to create Kafka topic: {}, Exception: {}",
newTopic.toString(), e);
- return false;
- }
- return true;
- }
-
- public boolean deleteTopic(String topicName) {
- final DeleteTopicsResult deleteTopicsResult =
this.adminClient.deleteTopics(Collections.singletonList(topicName));
- try {
- deleteTopicsResult.all().get();
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}",
topicName, e);
- return false;
- }
- return true;
+ _adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
}
- public static class Builder {
-
- private List<String> brokerIds = new ArrayList<>();
-
- public Builder newServer(String brokerId) {
- brokerIds.add(brokerId);
- return this;
- }
-
- public MiniKafkaCluster build()
- throws IOException, InterruptedException {
- return new MiniKafkaCluster(brokerIds);
- }
+ public void deleteTopic(String topicName)
+ throws ExecutionException, InterruptedException {
+
_adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]