This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new df0fc40  DRILL-7917: Update Kafka version to 2.8.0
df0fc40 is described below

commit df0fc401fd00c36c4741d5f4609d7c59d750b6f3
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Tue May 4 15:40:21 2021 +0300

    DRILL-7917: Update Kafka version to 2.8.0
---
 contrib/storage-kafka/pom.xml                      |  4 +--
 .../drill/exec/store/kafka/TestKafkaSuit.java      | 13 ++++---
 .../store/kafka/cluster/EmbeddedKafkaCluster.java  | 41 ++++++++++------------
 3 files changed, 30 insertions(+), 28 deletions(-)

diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 6b9c059..d626d40 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,7 @@
   <name>Drill : Contrib : Storage : Kafka</name>
 
   <properties>
-    <kafka.version>2.3.1</kafka.version>
+    <kafka.version>2.8.0</kafka.version>
     <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
   </properties>
 
@@ -64,7 +64,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.12</artifactId>
+      <artifactId>kafka_2.13</artifactId>
       <version>${kafka.version}</version>
       <exclusions>
         <exclusion>
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 6886b05..7bb099d 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,8 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
+import kafka.zk.KafkaZkClient;
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ZookeeperTestUtil;
@@ -32,6 +31,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -40,6 +41,7 @@ import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -59,7 +61,7 @@ public class TestKafkaSuit extends BaseTest {
 
   public static EmbeddedKafkaCluster embeddedKafkaCluster;
 
-  private static ZkClient zkClient;
+  private static KafkaZkClient zkClient;
 
   private static final AtomicInteger initCount = new AtomicInteger(0);
 
@@ -78,7 +80,10 @@ public class TestKafkaSuit extends BaseTest {
         ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
         System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
         embeddedKafkaCluster = new EmbeddedKafkaCluster();
-        zkClient = new 
ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), 
SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
+        zkClient = 
KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
+            false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
+            "kafka.server", "SessionExpireListener",
+            Option.<String>empty(), Option.<ZKClientConfig>empty());
         createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
         KafkaMessageGenerator generator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), 
StringSerializer.class);
         generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, 
NUM_JSON_MSG);
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index eccc17a..71a425d 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -23,20 +23,23 @@ import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
+import kafka.server.KafkaServer;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
 import org.apache.drill.exec.store.kafka.TestQueryConstants;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import scala.Option;
 
 public class EmbeddedKafkaCluster implements TestQueryConstants {
 
   private static final Logger logger = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
-  private List<KafkaServerStartable> brokers;
+  private List<KafkaServer> brokers;
   private ZookeeperHelper zkHelper;
   private KafkaAsyncCloser closer;
   private final Properties props;
@@ -93,8 +96,9 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
     brokers.add(getBroker(properties));
   }
 
-  private static KafkaServerStartable getBroker(Properties properties) {
-    KafkaServerStartable broker = new KafkaServerStartable(new 
KafkaConfig(properties));
+  private static KafkaServer getBroker(Properties properties) {
+    KafkaServer broker = new KafkaServer(new KafkaConfig(properties),
+        Time.SYSTEM, Option.<String>apply("kafka"), false);
     broker.startup();
     return broker;
   }
@@ -104,9 +108,7 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
     closer = null;
 
     if (brokers != null) {
-      for (KafkaServerStartable broker : brokers) {
-        broker.shutdown();
-      }
+      brokers.forEach(KafkaServer::shutdown);
       brokers = null;
     }
     if (zkHelper != null) {
@@ -116,12 +118,10 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
   }
 
   public void shutDownBroker(int brokerId) {
-    for (KafkaServerStartable broker : brokers) {
-      if 
(Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp()))
 == brokerId) {
-        broker.shutdown();
-        return;
-      }
-    }
+    brokers.stream()
+        .filter(broker -> 
Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) == 
brokerId)
+        .findAny()
+        .ifPresent(KafkaServer::shutdown);
   }
 
   public Properties getProps() {
@@ -130,11 +130,11 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
     return tmpProps;
   }
 
-  public List<KafkaServerStartable> getBrokers() {
+  public List<KafkaServer> getBrokers() {
     return brokers;
   }
 
-  public void setBrokers(List<KafkaServerStartable> brokers) {
+  public void setBrokers(List<KafkaServer> brokers) {
     this.brokers = brokers;
   }
 
@@ -143,13 +143,10 @@ public class EmbeddedKafkaCluster implements 
TestQueryConstants {
   }
 
   public String getKafkaBrokerList() {
-    StringBuilder sb = new StringBuilder();
-    for (KafkaServerStartable broker : brokers) {
-      KafkaConfig serverConfig = broker.staticServerConfig();
-      
sb.append(serverConfig.hostName()).append(":").append(serverConfig.port());
-      sb.append(",");
-    }
-    return sb.toString().substring(0, sb.toString().length() - 1);
+    return brokers.stream()
+        .map(KafkaServer::config)
+        .map(serverConfig -> serverConfig.hostName() + ":" + 
serverConfig.port())
+        .collect(Collectors.joining(","));
   }
 
   public void registerToClose(AutoCloseable autoCloseable) {

Reply via email to