This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 70ac1bd  Clean up KafkaPartitionLevelConsumerTest (#6803)
70ac1bd is described below

commit 70ac1bd99a64a951f588a9513e95441f32fea2cf
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 15 18:29:17 2021 -0700

    Clean up KafkaPartitionLevelConsumerTest (#6803)
    
    Simplify the logic of creating the kafka cluster in `MiniKafkaCluster`
    Put `localhost` as the `host.name` to be in sync with the quick-start
---
 .github/workflows/scripts/.pinot_test.sh           |   4 +
 .../kafka20/KafkaPartitionLevelConsumerTest.java   |  63 +++++------
 .../stream/kafka20/utils/EmbeddedZooKeeper.java    |  39 +++----
 .../stream/kafka20/utils/MiniKafkaCluster.java     | 125 ++++++---------------
 4 files changed, 79 insertions(+), 152 deletions(-)

diff --git a/.github/workflows/scripts/.pinot_test.sh 
b/.github/workflows/scripts/.pinot_test.sh
index 1a745b3..1e7ac14 100755
--- a/.github/workflows/scripts/.pinot_test.sh
+++ b/.github/workflows/scripts/.pinot_test.sh
@@ -21,6 +21,10 @@
 # Java version
 java -version
 
+# Check network
+ifconfig
+netstat -i
+
 # Only run integration tests if needed
 if [ "$RUN_INTEGRATION_TESTS" != false ]; then
   mvn test -B -P github-actions,integration-tests-only && exit 0 || exit 1
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 90e72e2..eebffcb 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -35,9 +35,6 @@ import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -48,26 +45,22 @@ import org.testng.annotations.Test;
  * Tests for the KafkaPartitionLevelConsumer.
  */
 public class KafkaPartitionLevelConsumerTest {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
   private static final long STABILIZE_SLEEP_DELAYS = 3000;
   private static final String TEST_TOPIC_1 = "foo";
   private static final String TEST_TOPIC_2 = "bar";
   private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
 
-  private MiniKafkaCluster kafkaCluster;
-  private String brokerAddress;
+  private MiniKafkaCluster _kafkaCluster;
+  private String _kafkaBrokerAddress;
 
   @BeforeClass
-  public void setup()
+  public void setUp()
       throws Exception {
-    kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
-    LOGGER.info("Trying to start MiniKafkaCluster");
-    kafkaCluster.start();
-    brokerAddress = NetUtils.getHostAddress() + ":" + 
kafkaCluster.getKafkaServerPort(0);
-    LOGGER.info("Kafka Broker Address is {}", brokerAddress);
-    kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
-    kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+    _kafkaCluster = new MiniKafkaCluster("0");
+    _kafkaCluster.start();
+    _kafkaBrokerAddress = _kafkaCluster.getKafkaServerAddress();
+    _kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+    _kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
     produceMsgToKafka();
     Thread.sleep(STABILIZE_SLEEP_DELAYS);
@@ -75,30 +68,26 @@ public class KafkaPartitionLevelConsumerTest {
 
   private void produceMsgToKafka() {
     Properties props = new Properties();
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
-    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.getClass().getName());
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _kafkaBrokerAddress);
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-    props.put(ProducerConfig.ACKS_CONFIG, "1");
-    KafkaProducer p = new KafkaProducer<>(props);
-    for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
-      p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
-      p.flush();
-      // TEST_TOPIC_2 has 2 partitions
-      p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
-      p.flush();
-      p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
-      p.flush();
+    try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+      for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
+        producer.send(new ProducerRecord<>(TEST_TOPIC_1, "sample_msg_" + i));
+        // TEST_TOPIC_2 has 2 partitions
+        producer.send(new ProducerRecord<>(TEST_TOPIC_2, "sample_msg_" + i));
+        producer.send(new ProducerRecord<>(TEST_TOPIC_2, "sample_msg_" + i));
+      }
     }
-    p.close();
   }
 
   @AfterClass
-  public void shutDown()
+  public void tearDown()
       throws Exception {
-    kafkaCluster.deleteTopic(TEST_TOPIC_1);
-    kafkaCluster.deleteTopic(TEST_TOPIC_2);
-    kafkaCluster.close();
+    _kafkaCluster.deleteTopic(TEST_TOPIC_1);
+    _kafkaCluster.deleteTopic(TEST_TOPIC_2);
+    _kafkaCluster.close();
   }
 
   @Test
@@ -106,7 +95,7 @@ public class KafkaPartitionLevelConsumerTest {
       throws Exception {
     String streamType = "kafka";
     String streamKafkaTopicName = "theTopic";
-    String streamKafkaBrokerList = brokerAddress;
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
     String streamKafkaConsumerType = "simple";
     String clientId = "clientId";
     String tableNameWithType = "tableName_REALTIME";
@@ -150,7 +139,7 @@ public class KafkaPartitionLevelConsumerTest {
   @Test
   public void testGetPartitionCount() {
     String streamType = "kafka";
-    String streamKafkaBrokerList = brokerAddress;
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
     String streamKafkaConsumerType = "simple";
     String clientId = "clientId";
     String tableNameWithType = "tableName_REALTIME";
@@ -185,7 +174,7 @@ public class KafkaPartitionLevelConsumerTest {
       throws Exception {
     String streamType = "kafka";
     String streamKafkaTopicName = "theTopic";
-    String streamKafkaBrokerList = brokerAddress;
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
     String streamKafkaConsumerType = "simple";
     String clientId = "clientId";
     String tableNameWithType = "tableName_REALTIME";
@@ -215,7 +204,7 @@ public class KafkaPartitionLevelConsumerTest {
   private void testFetchOffsets(String topic)
       throws Exception {
     String streamType = "kafka";
-    String streamKafkaBrokerList = brokerAddress;
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
     String streamKafkaConsumerType = "simple";
     String clientId = "clientId";
     String tableNameWithType = "tableName_REALTIME";
@@ -250,7 +239,7 @@ public class KafkaPartitionLevelConsumerTest {
   private void testConsumer(String topic)
       throws TimeoutException {
     String streamType = "kafka";
-    String streamKafkaBrokerList = brokerAddress;
+    String streamKafkaBrokerList = _kafkaBrokerAddress;
     String streamKafkaConsumerType = "simple";
     String clientId = "clientId";
     String tableNameWithType = "tableName_REALTIME";
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
index e66345d..d83cf63 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/EmbeddedZooKeeper.java
@@ -21,49 +21,36 @@ package org.apache.pinot.plugin.stream.kafka20.utils;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.Inet4Address;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.nio.file.Files;
-import java.util.Enumeration;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
 
 public class EmbeddedZooKeeper implements Closeable {
-
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"EmbeddedZooKeeper");
   private static final int TICK_TIME = 500;
-  private final NIOServerCnxnFactory factory;
-  private final ZooKeeperServer zookeeper;
-  private final File tmpDir;
-  private final int port;
+
+  private final NIOServerCnxnFactory _factory;
+  private final String _zkAddress;
 
   EmbeddedZooKeeper()
       throws IOException, InterruptedException {
-    this.tmpDir = Files.createTempDirectory(null).toFile();
-    this.factory = new NIOServerCnxnFactory();
-    this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new 
File(tmpDir, "log"), TICK_TIME);
-    InetSocketAddress addr = new InetSocketAddress(NetUtils.getHostAddress(), 
0);
-    factory.configure(addr, 0);
-    factory.startup(zookeeper);
-    this.port = zookeeper.getClientPort();
+    _factory = new NIOServerCnxnFactory();
+    ZooKeeperServer zkServer = new ZooKeeperServer(new File(TEMP_DIR, "data"), 
new File(TEMP_DIR, "log"), TICK_TIME);
+    _factory.configure(new InetSocketAddress("localhost", 0), 0);
+    _factory.startup(zkServer);
+    _zkAddress = "localhost:" + zkServer.getClientPort();
   }
 
-  public int getPort() {
-    return port;
+  public String getZkAddress() {
+    return _zkAddress;
   }
 
   @Override
   public void close()
       throws IOException {
-    zookeeper.shutdown();
-    factory.shutdown();
-    FileUtils.deleteDirectory(tmpDir);
+    _factory.shutdown();
+    FileUtils.deleteDirectory(TEMP_DIR);
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
index 98bbbb4..4e82248 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
@@ -19,67 +19,47 @@
 package org.apache.pinot.plugin.stream.kafka20.utils;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.utils.Time;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
 
 public final class MiniKafkaCluster implements Closeable {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"MiniKafkaCluster");
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MiniKafkaCluster.class);
-  private final EmbeddedZooKeeper zkServer;
-  private final List<KafkaServer> kafkaServer;
-  private final List<Integer> kafkaPorts;
-  private final Path tempDir;
-  private final AdminClient adminClient;
-  private final String zkUrl;
+  private final EmbeddedZooKeeper _zkServer;
+  private final KafkaServer _kafkaServer;
+  private final String _kafkaServerAddress;
+  private final AdminClient _adminClient;
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private MiniKafkaCluster(List<String> brokerIds)
+  public MiniKafkaCluster(String brokerId)
       throws IOException, InterruptedException {
-    this.zkServer = new EmbeddedZooKeeper();
-    this.zkUrl = NetUtils.getHostAddress() + ":" + zkServer.getPort();
-    LOGGER.info("MiniKafkaCluster Zookeeper Url is {}", zkUrl);
-    this.tempDir = 
Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), 
"mini-kafka-cluster");
-    this.kafkaServer = new ArrayList<>();
-    this.kafkaPorts = new ArrayList<>();
-    for (String id : brokerIds) {
-      int port = getAvailablePort();
-      LOGGER.info("Generate broker id = {}, port = {}", id, port);
-      KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
-      Seq seq =
-          
scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
-      kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
-      kafkaPorts.add(port);
-    }
-    Properties props = new Properties();
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
NetUtils.getHostAddress() + ":" + getKafkaServerPort(0));
-    adminClient = AdminClient.create(props);
+    _zkServer = new EmbeddedZooKeeper();
+    int kafkaServerPort = getAvailablePort();
+    KafkaConfig kafkaBrokerConfig = new 
KafkaConfig(createBrokerConfig(brokerId, kafkaServerPort));
+    Seq seq = 
JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
+    _kafkaServer = new KafkaServer(kafkaBrokerConfig, Time.SYSTEM, 
Option.empty(), seq);
+    _kafkaServerAddress = "localhost:" + kafkaServerPort;
+    Properties kafkaClientConfig = new Properties();
+    kafkaClientConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
_kafkaServerAddress);
+    _adminClient = AdminClient.create(kafkaClientConfig);
   }
 
-  static int getAvailablePort() {
+  private static int getAvailablePort() {
     try {
       try (ServerSocket socket = new ServerSocket(0)) {
         return socket.getLocalPort();
@@ -89,17 +69,16 @@ public final class MiniKafkaCluster implements Closeable {
     }
   }
 
-  private Properties createBrokerConfig(String nodeId, int port)
-      throws IOException {
+  private Properties createBrokerConfig(String brokerId, int port) {
     Properties props = new Properties();
-    props.put("broker.id", nodeId);
+    props.put("broker.id", brokerId);
     // We need to explicitly set the network interface we want to let Kafka 
bind to.
     // By default, it will bind to all the network interfaces, which might not 
be accessible always
     // in a container based environment.
-    props.put("host.name", NetUtils.getHostAddress());
+    props.put("host.name", "localhost");
     props.put("port", Integer.toString(port));
-    props.put("log.dir", Files.createTempDirectory(tempDir, 
"broker-").toAbsolutePath().toString());
-    props.put("zookeeper.connect", zkUrl);
+    props.put("log.dir", new File(TEMP_DIR, "log").getPath());
+    props.put("zookeeper.connect", _zkServer.getZkAddress());
     props.put("replica.socket.timeout.ms", "1500");
     props.put("controller.socket.timeout.ms", "1500");
     props.put("controlled.shutdown.enable", "true");
@@ -112,61 +91,29 @@ public final class MiniKafkaCluster implements Closeable {
   }
 
   public void start() {
-    for (KafkaServer s : kafkaServer) {
-      s.startup();
-    }
+    _kafkaServer.startup();
   }
 
   @Override
   public void close()
       throws IOException {
-    adminClient.close();
-    for (KafkaServer s : kafkaServer) {
-      s.shutdown();
-    }
-    this.zkServer.close();
-    FileUtils.deleteDirectory(tempDir.toFile());
+    _kafkaServer.shutdown();
+    _zkServer.close();
+    FileUtils.deleteDirectory(TEMP_DIR);
   }
 
-  public int getKafkaServerPort(int index) {
-    return kafkaPorts.get(index);
+  public String getKafkaServerAddress() {
+    return _kafkaServerAddress;
   }
 
-  public boolean createTopic(String topicName, int numPartitions, int 
replicationFactor) {
+  public void createTopic(String topicName, int numPartitions, int 
replicationFactor)
+      throws ExecutionException, InterruptedException {
     NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 
replicationFactor);
-    CreateTopicsResult createTopicsResult = 
this.adminClient.createTopics(Arrays.asList(newTopic));
-    try {
-      createTopicsResult.all().get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error("Failed to create Kafka topic: {}, Exception: {}", 
newTopic.toString(), e);
-      return false;
-    }
-    return true;
-  }
-
-  public boolean deleteTopic(String topicName) {
-    final DeleteTopicsResult deleteTopicsResult = 
this.adminClient.deleteTopics(Collections.singletonList(topicName));
-    try {
-      deleteTopicsResult.all().get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}", 
topicName, e);
-      return false;
-    }
-    return true;
+    _adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
   }
 
-  public static class Builder {
-
-    private List<String> brokerIds = new ArrayList<>();
-
-    public Builder newServer(String brokerId) {
-      brokerIds.add(brokerId);
-      return this;
-    }
-
-    public MiniKafkaCluster build()
-        throws IOException, InterruptedException {
-      return new MiniKafkaCluster(brokerIds);
-    }
+  public void deleteTopic(String topicName)
+      throws ExecutionException, InterruptedException {
+    
_adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
   }
-}
\ No newline at end of file
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to