This is an automated email from the ASF dual-hosted git repository.
xvrl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 65280a6 update kafka client version to 2.5.0 (#9902)
65280a6 is described below
commit 65280a6953a285142008d86a729dabb248dd5c29
Author: Xavier Léauté <[email protected]>
AuthorDate: Wed May 27 13:20:32 2020 -0700
update kafka client version to 2.5.0 (#9902)
- remove dependency on deprecated internal Kafka classes
- keep LZ4 version in line with the version shipped with Kafka
---
extensions-contrib/kafka-emitter/pom.xml | 2 +-
extensions-core/kafka-extraction-namespace/pom.xml | 14 ++++---
extensions-core/kafka-indexing-service/pom.xml | 9 ++++-
.../kafka/supervisor/KafkaSupervisorTest.java | 44 ++++++----------------
.../druid/indexing/kafka/test/TestBroker.java | 28 +++++++++++---
integration-tests/docker-base/setup.sh | 26 ++++++++-----
licenses.yaml | 6 +--
pom.xml | 7 +++-
web-console/script/druid | 8 ++--
9 files changed, 82 insertions(+), 62 deletions(-)
diff --git a/extensions-contrib/kafka-emitter/pom.xml
b/extensions-contrib/kafka-emitter/pom.xml
index 8feed6e..1f69ae3 100644
--- a/extensions-contrib/kafka-emitter/pom.xml
+++ b/extensions-contrib/kafka-emitter/pom.xml
@@ -38,7 +38,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.10.2.2</version>
+ <version>${apache.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml
b/extensions-core/kafka-extraction-namespace/pom.xml
index a33468a..0b5f269 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -98,11 +98,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
@@ -136,6 +131,13 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
+ <version>3.5.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${apache.kafka.version}</version>
@@ -176,7 +178,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.12.7</version>
+ <version>2.12.10</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/extensions-core/kafka-indexing-service/pom.xml
b/extensions-core/kafka-indexing-service/pom.xml
index cfc50ff..1f6a64d 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -142,6 +142,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
+ <version>3.5.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${apache.kafka.version}</version>
@@ -188,7 +195,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.12.7</version>
+ <version>2.12.10</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 79514d2..086a9e9 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -27,10 +27,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import kafka.admin.AdminUtils;
-import kafka.admin.BrokerMetadata;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
@@ -89,10 +85,12 @@ import
org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.easymock.Capture;
@@ -111,8 +109,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import scala.Option;
-import scala.collection.Seq;
import java.io.File;
import java.io.IOException;
@@ -148,7 +144,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
- private static ZkUtils zkUtils;
private final int numThreads;
@@ -202,8 +197,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
-
- zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000,
JaasUtils.isZkSecurityEnabled());
}
@Before
@@ -237,9 +230,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer.stop();
zkServer = null;
-
- zkUtils.close();
- zkUtils = null;
}
@Test
@@ -3242,8 +3232,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
- //create topic manually
- AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new
Properties(), RackAwareMode.Enforced$.MODULE$);
+ // create topic manually
+ try (Admin admin = kafkaServer.newAdminClient()) {
+ admin.createTopics(
+ Collections.singletonList(new NewTopic(topic, NUM_PARTITIONS,
(short) 1))
+ ).all().get();
+ }
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
@@ -3266,23 +3260,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
private void addMoreEvents(int numEventsPerPartition, int num_partitions)
throws Exception
{
- Seq<BrokerMetadata> brokerList = AdminUtils.getBrokerMetadatas(
- zkUtils,
- RackAwareMode.Enforced$.MODULE$,
- Option.apply(zkUtils.getSortedBrokerList())
- );
- scala.collection.Map<Object, Seq<Object>> replicaAssignment =
AdminUtils.assignReplicasToBrokers(
- brokerList,
- num_partitions,
- 1, 0, 0
- );
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(
- zkUtils,
- topic,
- replicaAssignment,
- new Properties(),
- true
- );
+ try (Admin admin = kafkaServer.newAdminClient()) {
+ admin.createPartitions(Collections.singletonMap(topic,
NewPartitions.increaseTo(num_partitions))).all().get();
+ }
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 0fee75e..9d3f7c5 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -25,6 +25,7 @@ import kafka.server.KafkaServer;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -100,18 +101,30 @@ public class TestBroker implements Closeable
public KafkaProducer<byte[], byte[]> newProducer()
{
- return new KafkaProducer(producerProperties());
+ return new KafkaProducer<>(producerProperties());
+ }
+
+ public Admin newAdminClient()
+ {
+ return Admin.create(adminClientProperties());
+ }
+
+ Map<String, Object> adminClientProperties()
+ {
+ final Map<String, Object> props = new HashMap<>();
+ commonClientProperties(props);
+ return props;
}
public KafkaConsumer<byte[], byte[]> newConsumer()
{
- return new KafkaConsumer(consumerProperties());
+ return new KafkaConsumer<>(consumerProperties());
}
- public Map<String, String> producerProperties()
+ public Map<String, Object> producerProperties()
{
- final Map<String, String> props = new HashMap<>();
- props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
+ final Map<String, Object> props = new HashMap<>();
+ commonClientProperties(props);
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("acks", "all");
@@ -120,6 +133,11 @@ public class TestBroker implements Closeable
return props;
}
+ void commonClientProperties(Map<String, Object> props)
+ {
+ props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
+ }
+
public Map<String, Object> consumerProperties()
{
final Map<String, Object> props =
KafkaConsumerConfigs.getConsumerProperties();
diff --git a/integration-tests/docker-base/setup.sh
b/integration-tests/docker-base/setup.sh
index a6dc552..7a066ec 100644
--- a/integration-tests/docker-base/setup.sh
+++ b/integration-tests/docker-base/setup.sh
@@ -31,18 +31,26 @@ apt-get install -y mysql-server
apt-get install -y supervisor
# Zookeeper
-wget -q -O /tmp/zookeeper-3.4.14.tar.gz
"https://apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz"
-tar -xzf /tmp/zookeeper-3.4.14.tar.gz -C /usr/local
-cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg
/usr/local/zookeeper-3.4.14/conf/zoo.cfg
-ln -s /usr/local/zookeeper-3.4.14 /usr/local/zookeeper
-rm /tmp/zookeeper-3.4.14.tar.gz
+
+#ZK_VERSION=3.5.8
+#ZK_TAR=apache-zookeeper-$ZK_VERSION-bin
+
+ZK_VERISON=3.4.14
+ZK_TAR=zookeeper-$ZK_VERSION
+
+wget -q -O /tmp/$ZK_TAR.tar.gz
"https://apache.org/dist/zookeeper/zookeeper-$ZK_VERSION/$ZK_TAR.tar.gz"
+tar -xzf /tmp/$ZK_TAR.tar.gz -C /usr/local
+cp /usr/local/$ZK_TAR/conf/zoo_sample.cfg /usr/local/$ZK_TAR/conf/zoo.cfg
+ln -s /usr/local/$ZK_TAR /usr/local/zookeeper
+rm /tmp/$ZK_TAR.tar.gz
# Kafka
# Match the version to the Kafka client used by KafkaSupervisor
-wget -q -O /tmp/kafka_2.12-2.1.1.tgz
"https://apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz"
-tar -xzf /tmp/kafka_2.12-2.1.1.tgz -C /usr/local
-ln -s /usr/local/kafka_2.12-2.1.1 /usr/local/kafka
-rm /tmp/kafka_2.12-2.1.1.tgz
+KAFKA_VERSION=2.5.0
+wget -q -O /tmp/kafka_2.12-$KAFKA_VERSION.tgz
"https://apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
+tar -xzf /tmp/kafka_2.12-$KAFKA_VERSION.tgz -C /usr/local
+ln -s /usr/local/kafka_2.12-$KAFKA_VERSION /usr/local/kafka
+rm /tmp/kafka_2.12-$KAFKA_VERSION.tgz
# Druid system user
adduser --system --group --no-create-home druid \
diff --git a/licenses.yaml b/licenses.yaml
index 6680ade..0dd9acd 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -1975,7 +1975,7 @@ name: LZ4 Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 1.6.0
+version: 1.7.1
libraries:
- org.lz4: lz4-java
@@ -3251,7 +3251,7 @@ libraries:
---
name: Apache Kafka
-version: 2.2.2
+version: 2.5.0
license_category: binary
module: extensions/druid-kafka-indexing-service
license_name: Apache License version 2.0
@@ -4159,7 +4159,7 @@ name: Apache Kafka
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
-version: 2.2.2
+version: 2.5.0
libraries:
- org.apache.kafka: kafka_2.12
- org.apache.kafka: kafka-clients
diff --git a/pom.xml b/pom.xml
index aeffceb..c3314c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
<aether.version>0.9.0.M2</aether.version>
<apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
- <apache.kafka.version>2.2.2</apache.kafka.version>
+ <apache.kafka.version>2.5.0</apache.kafka.version>
<apache.ranger.version>2.0.0</apache.ranger.version>
<apache.ranger.gson.version>2.2.4</apache.ranger.gson.version>
<avatica.version>1.15.0</avatica.version>
@@ -111,6 +111,9 @@
<powermock.version>2.0.2</powermock.version>
<aws.sdk.version>1.11.199</aws.sdk.version>
<caffeine.version>2.8.0</caffeine.version>
+ <!-- Curator requires 3.4.x ZooKeeper clients to maintain
compatibility with 3.4.x ZooKeeper servers,
+ If we upgrade to 3.5.x clients, curator requires 3.5.x servers,
which would break backwards compatibility
+ see http://curator.apache.org/zk-compatibility.html -->
<!-- When upgrading ZK, edit docs and integration tests as well
(integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.14</zookeeper.version>
<checkerframework.version>2.5.7</checkerframework.version>
@@ -771,7 +774,7 @@
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
- <version>1.6.0</version>
+ <version>1.7.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
diff --git a/web-console/script/druid b/web-console/script/druid
index 96f67b8..767c3a7 100755
--- a/web-console/script/druid
+++ b/web-console/script/druid
@@ -58,13 +58,15 @@ function _download_zookeeper() {
local dest="$1"
local zk_version
zk_version="$(_get_zookeeper_version)"
+ #zk_tar=apache-zookeeper-${zk_version}-bin # for zk 3.5.x
+ zk_tar=zookeeper-${zk_version} # for zk 3.4.x
_log "Downloading zookeeper"
- curl -s
"https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz"
\
+ curl -s
"https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/$zk_tar.tar.gz"
\
| tar xz \
&& rm -rf "$dest" \
- && mv "zookeeper-${zk_version}" "$dest" \
- && rm -f "zookeeper-${zk_version}"
+ && mv "$zk_tar" "$dest" \
+ && rm -f "$zk_tar"
}
function _build_distribution() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]