This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new df0fc40 DRILL-7917: Update Kafka version to 2.8.0
df0fc40 is described below
commit df0fc401fd00c36c4741d5f4609d7c59d750b6f3
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Tue May 4 15:40:21 2021 +0300
DRILL-7917: Update Kafka version to 2.8.0
---
contrib/storage-kafka/pom.xml | 4 +--
.../drill/exec/store/kafka/TestKafkaSuit.java | 13 ++++---
.../store/kafka/cluster/EmbeddedKafkaCluster.java | 41 ++++++++++------------
3 files changed, 30 insertions(+), 28 deletions(-)
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 6b9c059..d626d40 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,7 @@
<name>Drill : Contrib : Storage : Kafka</name>
<properties>
- <kafka.version>2.3.1</kafka.version>
+ <kafka.version>2.8.0</kafka.version>
<kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
</properties>
@@ -64,7 +64,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
+ <artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 6886b05..7bb099d 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,8 +17,7 @@
*/
package org.apache.drill.exec.store.kafka;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
+import kafka.zk.KafkaZkClient;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ZookeeperTestUtil;
@@ -32,6 +31,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.client.ZKClientConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@@ -40,6 +41,7 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
import java.util.Collections;
import java.util.HashMap;
@@ -59,7 +61,7 @@ public class TestKafkaSuit extends BaseTest {
public static EmbeddedKafkaCluster embeddedKafkaCluster;
- private static ZkClient zkClient;
+ private static KafkaZkClient zkClient;
private static final AtomicInteger initCount = new AtomicInteger(0);
@@ -78,7 +80,10 @@ public class TestKafkaSuit extends BaseTest {
ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM,
ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
embeddedKafkaCluster = new EmbeddedKafkaCluster();
- zkClient = new
ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(),
SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
+ zkClient =
KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
+ false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
+ "kafka.server", "SessionExpireListener",
+ Option.<String>empty(), Option.<ZKClientConfig>empty());
createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC,
NUM_JSON_MSG);
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index eccc17a..71a425d 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -23,20 +23,23 @@ import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Collectors;
+import kafka.server.KafkaServer;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
import org.apache.drill.exec.store.kafka.TestQueryConstants;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import scala.Option;
public class EmbeddedKafkaCluster implements TestQueryConstants {
private static final Logger logger =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
- private List<KafkaServerStartable> brokers;
+ private List<KafkaServer> brokers;
private ZookeeperHelper zkHelper;
private KafkaAsyncCloser closer;
private final Properties props;
@@ -93,8 +96,9 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
brokers.add(getBroker(properties));
}
- private static KafkaServerStartable getBroker(Properties properties) {
- KafkaServerStartable broker = new KafkaServerStartable(new
KafkaConfig(properties));
+ private static KafkaServer getBroker(Properties properties) {
+ KafkaServer broker = new KafkaServer(new KafkaConfig(properties),
+ Time.SYSTEM, Option.<String>apply("kafka"), false);
broker.startup();
return broker;
}
@@ -104,9 +108,7 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
closer = null;
if (brokers != null) {
- for (KafkaServerStartable broker : brokers) {
- broker.shutdown();
- }
+ brokers.forEach(KafkaServer::shutdown);
brokers = null;
}
if (zkHelper != null) {
@@ -116,12 +118,10 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
}
public void shutDownBroker(int brokerId) {
- for (KafkaServerStartable broker : brokers) {
- if
(Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp()))
== brokerId) {
- broker.shutdown();
- return;
- }
- }
+ brokers.stream()
+ .filter(broker ->
Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) ==
brokerId)
+ .findAny()
+ .ifPresent(KafkaServer::shutdown);
}
public Properties getProps() {
@@ -130,11 +130,11 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
return tmpProps;
}
- public List<KafkaServerStartable> getBrokers() {
+ public List<KafkaServer> getBrokers() {
return brokers;
}
- public void setBrokers(List<KafkaServerStartable> brokers) {
+ public void setBrokers(List<KafkaServer> brokers) {
this.brokers = brokers;
}
@@ -143,13 +143,10 @@ public class EmbeddedKafkaCluster implements
TestQueryConstants {
}
public String getKafkaBrokerList() {
- StringBuilder sb = new StringBuilder();
- for (KafkaServerStartable broker : brokers) {
- KafkaConfig serverConfig = broker.staticServerConfig();
-
sb.append(serverConfig.hostName()).append(":").append(serverConfig.port());
- sb.append(",");
- }
- return sb.toString().substring(0, sb.toString().length() - 1);
+ return brokers.stream()
+ .map(KafkaServer::config)
+ .map(serverConfig -> serverConfig.hostName() + ":" +
serverConfig.port())
+ .collect(Collectors.joining(","));
}
public void registerToClose(AutoCloseable autoCloseable) {