This is an automated email from the ASF dual-hosted git repository.

xvrl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 65280a6  update kafka client version to 2.5.0 (#9902)
65280a6 is described below

commit 65280a6953a285142008d86a729dabb248dd5c29
Author: Xavier Léauté <[email protected]>
AuthorDate: Wed May 27 13:20:32 2020 -0700

    update kafka client version to 2.5.0 (#9902)
    
    - remove dependency on deprecated internal Kafka classes
    - keep LZ4 version in line with the version shipped with Kafka
---
 extensions-contrib/kafka-emitter/pom.xml           |  2 +-
 extensions-core/kafka-extraction-namespace/pom.xml | 14 ++++---
 extensions-core/kafka-indexing-service/pom.xml     |  9 ++++-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 44 ++++++----------------
 .../druid/indexing/kafka/test/TestBroker.java      | 28 +++++++++++---
 integration-tests/docker-base/setup.sh             | 26 ++++++++-----
 licenses.yaml                                      |  6 +--
 pom.xml                                            |  7 +++-
 web-console/script/druid                           |  8 ++--
 9 files changed, 82 insertions(+), 62 deletions(-)

diff --git a/extensions-contrib/kafka-emitter/pom.xml 
b/extensions-contrib/kafka-emitter/pom.xml
index 8feed6e..1f69ae3 100644
--- a/extensions-contrib/kafka-emitter/pom.xml
+++ b/extensions-contrib/kafka-emitter/pom.xml
@@ -38,7 +38,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>0.10.2.2</version>
+      <version>${apache.kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml 
b/extensions-core/kafka-extraction-namespace/pom.xml
index a33468a..0b5f269 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -98,11 +98,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <scope>provided</scope>
@@ -136,6 +131,13 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
+      <version>3.5.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
       <version>${apache.kafka.version}</version>
@@ -176,7 +178,7 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>2.12.7</version>
+      <version>2.12.10</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git a/extensions-core/kafka-indexing-service/pom.xml 
b/extensions-core/kafka-indexing-service/pom.xml
index cfc50ff..1f6a64d 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -142,6 +142,13 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
+      <version>3.5.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.12</artifactId>
       <version>${apache.kafka.version}</version>
@@ -188,7 +195,7 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>2.12.7</version>
+      <version>2.12.10</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 79514d2..086a9e9 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -27,10 +27,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import kafka.admin.AdminUtils;
-import kafka.admin.BrokerMetadata;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
 import org.apache.curator.test.TestingCluster;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.DimensionSchema;
@@ -89,10 +85,12 @@ import 
org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.easymock.Capture;
@@ -111,8 +109,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.Option;
-import scala.collection.Seq;
 
 import java.io.File;
 import java.io.IOException;
@@ -148,7 +144,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private static String kafkaHost;
   private static DataSchema dataSchema;
   private static int topicPostfix;
-  private static ZkUtils zkUtils;
 
   private final int numThreads;
 
@@ -202,8 +197,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
 
     dataSchema = getDataSchema(DATASOURCE);
-
-    zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, 
JaasUtils.isZkSecurityEnabled());
   }
 
   @Before
@@ -237,9 +230,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     zkServer.stop();
     zkServer = null;
-
-    zkUtils.close();
-    zkUtils = null;
   }
 
   @Test
@@ -3242,8 +3232,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
-    //create topic manually
-    AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new 
Properties(), RackAwareMode.Enforced$.MODULE$);
+    // create topic manually
+    try (Admin admin = kafkaServer.newAdminClient()) {
+      admin.createTopics(
+          Collections.singletonList(new NewTopic(topic, NUM_PARTITIONS, 
(short) 1))
+      ).all().get();
+    }
 
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
@@ -3266,23 +3260,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
   private void addMoreEvents(int numEventsPerPartition, int num_partitions) 
throws Exception
   {
-    Seq<BrokerMetadata> brokerList = AdminUtils.getBrokerMetadatas(
-        zkUtils,
-        RackAwareMode.Enforced$.MODULE$,
-        Option.apply(zkUtils.getSortedBrokerList())
-    );
-    scala.collection.Map<Object, Seq<Object>> replicaAssignment = 
AdminUtils.assignReplicasToBrokers(
-        brokerList,
-        num_partitions,
-        1, 0, 0
-    );
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(
-        zkUtils,
-        topic,
-        replicaAssignment,
-        new Properties(),
-        true
-    );
+    try (Admin admin = kafkaServer.newAdminClient()) {
+      admin.createPartitions(Collections.singletonMap(topic, 
NewPartitions.increaseTo(num_partitions))).all().get();
+    }
 
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 0fee75e..9d3f7c5 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -25,6 +25,7 @@ import kafka.server.KafkaServer;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -100,18 +101,30 @@ public class TestBroker implements Closeable
 
   public KafkaProducer<byte[], byte[]> newProducer()
   {
-    return new KafkaProducer(producerProperties());
+    return new KafkaProducer<>(producerProperties());
+  }
+
+  public Admin newAdminClient()
+  {
+    return Admin.create(adminClientProperties());
+  }
+
+  Map<String, Object> adminClientProperties()
+  {
+    final Map<String, Object> props = new HashMap<>();
+    commonClientProperties(props);
+    return props;
   }
 
   public KafkaConsumer<byte[], byte[]> newConsumer()
   {
-    return new KafkaConsumer(consumerProperties());
+    return new KafkaConsumer<>(consumerProperties());
   }
 
-  public Map<String, String> producerProperties()
+  public Map<String, Object> producerProperties()
   {
-    final Map<String, String> props = new HashMap<>();
-    props.put("bootstrap.servers", StringUtils.format("localhost:%d", 
getPort()));
+    final Map<String, Object> props = new HashMap<>();
+    commonClientProperties(props);
     props.put("key.serializer", ByteArraySerializer.class.getName());
     props.put("value.serializer", ByteArraySerializer.class.getName());
     props.put("acks", "all");
@@ -120,6 +133,11 @@ public class TestBroker implements Closeable
     return props;
   }
 
+  void commonClientProperties(Map<String, Object> props)
+  {
+    props.put("bootstrap.servers", StringUtils.format("localhost:%d", 
getPort()));
+  }
+
   public Map<String, Object> consumerProperties()
   {
     final Map<String, Object> props = 
KafkaConsumerConfigs.getConsumerProperties();
diff --git a/integration-tests/docker-base/setup.sh 
b/integration-tests/docker-base/setup.sh
index a6dc552..7a066ec 100644
--- a/integration-tests/docker-base/setup.sh
+++ b/integration-tests/docker-base/setup.sh
@@ -31,18 +31,26 @@ apt-get install -y mysql-server
 apt-get install -y supervisor
 
 # Zookeeper
-wget -q -O /tmp/zookeeper-3.4.14.tar.gz 
"https://apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz";
-tar -xzf /tmp/zookeeper-3.4.14.tar.gz -C /usr/local
-cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg 
/usr/local/zookeeper-3.4.14/conf/zoo.cfg
-ln -s /usr/local/zookeeper-3.4.14 /usr/local/zookeeper
-rm /tmp/zookeeper-3.4.14.tar.gz
+
+#ZK_VERSION=3.5.8
+#ZK_TAR=apache-zookeeper-$ZK_VERSION-bin
+
+ZK_VERISON=3.4.14
+ZK_TAR=zookeeper-$ZK_VERSION
+
+wget -q -O /tmp/$ZK_TAR.tar.gz 
"https://apache.org/dist/zookeeper/zookeeper-$ZK_VERSION/$ZK_TAR.tar.gz";
+tar -xzf /tmp/$ZK_TAR.tar.gz -C /usr/local
+cp /usr/local/$ZK_TAR/conf/zoo_sample.cfg /usr/local/$ZK_TAR/conf/zoo.cfg
+ln -s /usr/local/$ZK_TAR /usr/local/zookeeper
+rm /tmp/$ZK_TAR.tar.gz
 
 # Kafka
 # Match the version to the Kafka client used by KafkaSupervisor
-wget -q -O /tmp/kafka_2.12-2.1.1.tgz 
"https://apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz";
-tar -xzf /tmp/kafka_2.12-2.1.1.tgz -C /usr/local
-ln -s /usr/local/kafka_2.12-2.1.1 /usr/local/kafka
-rm /tmp/kafka_2.12-2.1.1.tgz
+KAFKA_VERSION=2.5.0
+wget -q -O /tmp/kafka_2.12-$KAFKA_VERSION.tgz 
"https://apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz";
+tar -xzf /tmp/kafka_2.12-$KAFKA_VERSION.tgz -C /usr/local
+ln -s /usr/local/kafka_2.12-$KAFKA_VERSION /usr/local/kafka
+rm /tmp/kafka_2.12-$KAFKA_VERSION.tgz
 
 # Druid system user
 adduser --system --group --no-create-home druid \
diff --git a/licenses.yaml b/licenses.yaml
index 6680ade..0dd9acd 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -1975,7 +1975,7 @@ name: LZ4 Java
 license_category: binary
 module: java-core
 license_name: Apache License version 2.0
-version: 1.6.0
+version: 1.7.1
 libraries:
   - org.lz4: lz4-java
 
@@ -3251,7 +3251,7 @@ libraries:
 ---
 
 name: Apache Kafka
-version: 2.2.2
+version: 2.5.0
 license_category: binary
 module: extensions/druid-kafka-indexing-service
 license_name: Apache License version 2.0
@@ -4159,7 +4159,7 @@ name: Apache Kafka
 license_category: binary
 module: extensions/kafka-extraction-namespace
 license_name: Apache License version 2.0
-version: 2.2.2
+version: 2.5.0
 libraries:
   - org.apache.kafka: kafka_2.12
   - org.apache.kafka: kafka-clients
diff --git a/pom.xml b/pom.xml
index aeffceb..c3314c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
         <aether.version>0.9.0.M2</aether.version>
         <apache.curator.version>4.3.0</apache.curator.version>
         <apache.curator.test.version>2.12.0</apache.curator.test.version>
-        <apache.kafka.version>2.2.2</apache.kafka.version>
+        <apache.kafka.version>2.5.0</apache.kafka.version>
         <apache.ranger.version>2.0.0</apache.ranger.version>
         <apache.ranger.gson.version>2.2.4</apache.ranger.gson.version>
         <avatica.version>1.15.0</avatica.version>
@@ -111,6 +111,9 @@
         <powermock.version>2.0.2</powermock.version>
         <aws.sdk.version>1.11.199</aws.sdk.version>
         <caffeine.version>2.8.0</caffeine.version>
+        <!-- Curator requires 3.4.x ZooKeeper clients to maintain 
compatibility with 3.4.x ZooKeeper servers,
+             If we upgrade to 3.5.x clients, curator requires 3.5.x servers, 
which would break backwards compatibility
+             see http://curator.apache.org/zk-compatibility.html -->
         <!-- When upgrading ZK, edit docs and integration tests as well 
(integration-tests/docker-base/setup.sh) -->
         <zookeeper.version>3.4.14</zookeeper.version>
         <checkerframework.version>2.5.7</checkerframework.version>
@@ -771,7 +774,7 @@
             <dependency>
                 <groupId>org.lz4</groupId>
                 <artifactId>lz4-java</artifactId>
-                <version>1.6.0</version>
+                <version>1.7.1</version>
             </dependency>
             <dependency>
                 <groupId>com.google.protobuf</groupId>
diff --git a/web-console/script/druid b/web-console/script/druid
index 96f67b8..767c3a7 100755
--- a/web-console/script/druid
+++ b/web-console/script/druid
@@ -58,13 +58,15 @@ function _download_zookeeper() {
   local dest="$1"
   local zk_version
   zk_version="$(_get_zookeeper_version)"
+  #zk_tar=apache-zookeeper-${zk_version}-bin # for zk 3.5.x
+  zk_tar=zookeeper-${zk_version} # for zk 3.4.x
 
   _log "Downloading zookeeper"
-  curl -s 
"https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz";
 \
+  curl -s 
"https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/$zk_tar.tar.gz";
 \
     | tar xz \
   && rm -rf "$dest" \
-  && mv "zookeeper-${zk_version}" "$dest" \
-  && rm -f "zookeeper-${zk_version}"
+  && mv "$zk_tar" "$dest" \
+  && rm -f "$zk_tar"
 }
 
 function _build_distribution() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to