This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 863550541 STORM-4004 - Upgrade Kafka Clients to 3.6.0 (#3604)
863550541 is described below
commit 863550541b7d446720d0278cf1f4d5999375dd2c
Author: Richard Zowalla <[email protected]>
AuthorDate: Mon Dec 4 12:43:26 2023 +0100
STORM-4004 - Upgrade Kafka Clients to 3.6.0 (#3604)
* STORM-4004 - Upgrade Kafka Clients to 3.6.0
* STORM-4004 - Upgrade Kafka Clients to 3.6.0
* STORM-4004 - Fix license
---
DEPENDENCY-LICENSES | 7 +++++--
LICENSE-binary | 5 +++--
examples/storm-kafka-client-examples/pom.xml | 4 ++++
external/storm-kafka-client/pom.xml | 4 ++--
.../org/apache/storm/kafka/spout/KafkaSpoutConfig.java | 17 +++++++++++++++++
.../src/test/java/org/apache/storm/kafka/KafkaUnit.java | 5 ++---
.../java/org/apache/storm/kafka/bolt/KafkaBoltTest.java | 13 ++++++++++---
.../storm/kafka/spout/KafkaSpoutNullTupleTest.java | 1 +
pom.xml | 2 +-
9 files changed, 45 insertions(+), 13 deletions(-)
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index ffea76d71..c799bd2f9 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -130,7 +130,7 @@ List of third-party dependencies grouped by their license
type.
* Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 -
http://hc.apache.org/httpcomponents-core-ga)
* Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 -
http://hc.apache.org/httpcomponents-core-ga)
* Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
- * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 -
http://kafka.apache.org)
+ * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 -
https://kafka.apache.org)
* Apache Maven Artifact Transfer
(org.apache.maven.shared:maven-artifact-transfer:0.9.1 -
https://maven.apache.org/shared/maven-artifact-transfer/)
* Apache Maven Common Artifact Filters
(org.apache.maven.shared:maven-common-artifact-filters:3.0.1 -
https://maven.apache.org/shared/maven-common-artifact-filters/)
* Apache Maven Dependency Tree
(org.apache.maven.shared:maven-dependency-tree:2.2 -
http://maven.apache.org/shared/maven-dependency-tree/)
@@ -319,7 +319,6 @@ List of third-party dependencies grouped by their license
type.
* Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 -
https://lucene.apache.org/lucene-parent/lucene-sandbox)
* Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 -
https://lucene.apache.org/lucene-parent/lucene-spatial3d)
* Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 -
https://lucene.apache.org/lucene-parent/lucene-suggest)
- * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 -
https://github.com/jpountz/lz4-java)
* LZ4 and xxHash (org.lz4:lz4-java:1.8.0 -
https://github.com/lz4/lz4-java)
* Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 -
http://maven.apache.org/maven-aether-provider/)
* Maven Artifact (org.apache.maven:maven-artifact:3.0 -
http://maven.apache.org/maven-artifact/)
@@ -516,6 +515,10 @@ List of third-party dependencies grouped by their license
type.
* dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
+ BSD 2-Clause License
+
+ * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 -
https://github.com/luben/zstd-jni)
+
BSD-3-Clause
* asm (org.ow2.asm:asm:9.3 - http://asm.ow2.io/)
diff --git a/LICENSE-binary b/LICENSE-binary
index 3d10d5dac..2ceb9202d 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -737,7 +737,7 @@ The license texts of these dependencies can be found in the
licenses directory.
* Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 -
http://hc.apache.org/httpcomponents-client-ga)
* Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 -
http://hc.apache.org/httpcomponents-core-ga)
* Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
- * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 -
http://kafka.apache.org)
+ * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 -
https://kafka.apache.org)
* Apache Log4j 1.x Compatibility API
(org.apache.logging.log4j:log4j-1.2-api:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j-1.2-api/)
* Apache Log4j API (org.apache.logging.log4j:log4j-api:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j-api/)
* Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j-core/)
@@ -886,7 +886,7 @@ The license texts of these dependencies can be found in the
licenses directory.
* Kerby PKIX Project (org.apache.kerby:kerby-pkix:1.0.1 -
http://directory.apache.org/kerby/kerby-pkix)
* Kerby Util (org.apache.kerby:kerby-util:1.0.1 -
http://directory.apache.org/kerby/kerby-common/kerby-util)
* Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 -
http://directory.apache.org/kerby/kerby-common/kerby-xdr)
- * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 -
https://github.com/jpountz/lz4-java)
+ * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 -
https://github.com/lz4/lz4-java)
* Maven Artifact (org.apache.maven:maven-artifact:3.6.0 -
https://maven.apache.org/ref/3.6.0/maven-artifact/)
* Maven Artifact Resolver API
(org.apache.maven.resolver:maven-resolver-api:1.3.3 -
https://maven.apache.org/resolver/maven-resolver-api/)
* Maven Artifact Resolver Connector Basic
(org.apache.maven.resolver:maven-resolver-connector-basic:1.3.3 -
https://maven.apache.org/resolver/maven-resolver-connector-basic/)
@@ -1030,6 +1030,7 @@ The license texts of these dependencies can be found in
the licenses directory.
BSD 2-Clause license
* dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
+ * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 -
https://github.com/luben/zstd-jni)
BSD-3-Clause
diff --git a/examples/storm-kafka-client-examples/pom.xml
b/examples/storm-kafka-client-examples/pom.xml
index 6d3273642..61abc1e2c 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -30,6 +30,10 @@
<artifactId>storm-kafka-client-examples</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
diff --git a/external/storm-kafka-client/pom.xml
b/external/storm-kafka-client/pom.xml
index 36a4055d7..2b70874ca 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -110,7 +110,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_2.13</artifactId>
<version>${storm.kafka.client.version}</version>
<classifier>test</classifier>
<scope>test</scope>
@@ -130,7 +130,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_2.13</artifactId>
<version>${storm.kafka.client.version}</version>
<scope>test</scope>
<exclusions>
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 18010378d..cc2efc475 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -82,6 +82,7 @@ public class KafkaSpoutConfig<K, V> extends
CommonKafkaSpoutConfig<K, V> {
this.processingGuarantee = builder.processingGuarantee;
this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
+ this.setConsumerGroupId(builder.groupId);
}
/**
@@ -122,6 +123,7 @@ public class KafkaSpoutConfig<K, V> extends
CommonKafkaSpoutConfig<K, V> {
private ProcessingGuarantee processingGuarantee =
DEFAULT_PROCESSING_GUARANTEE;
private boolean tupleTrackingEnforced = false;
private int metricsTimeBucketSizeInSecs =
DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+ private String groupId;
public Builder(String bootstrapServers, String... topics) {
super(bootstrapServers, topics);
@@ -160,6 +162,15 @@ public class KafkaSpoutConfig<K, V> extends
CommonKafkaSpoutConfig<K, V> {
return this;
}
+ /**
+ * Specifies the group id.
+ * @param groupId the group id
+ */
+ public Builder<K, V> setGroupId(String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
/**
* Defines the max number of polled offsets (records) that can be
pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled
until the next successful commit(s) sets the number
@@ -348,6 +359,12 @@ public class KafkaSpoutConfig<K, V> extends
CommonKafkaSpoutConfig<K, V> {
return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
}
+ public void setConsumerGroupId(String groupId) {
+ if (groupId != null) {
+ getKafkaProps().put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ }
+ }
+
public int getMaxUncommittedOffsets() {
return maxUncommittedOffsets;
}
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 97d358db2..50dbdf84a 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
@@ -37,6 +36,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.storm.testing.TmpPath;
public class KafkaUnit {
@@ -64,8 +64,7 @@ public class KafkaUnit {
brokerProps.setProperty("listeners",
String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
- MockTime mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
+ kafkaServer = TestUtils.createServer(config, new MockTime());
// setup default Producer
createProducer();
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 0b0a64ed3..63b98eb6a 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.storm.Testing;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -66,7 +67,9 @@ public class KafkaBoltTest {
@Test
public void testSimple() {
- MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ MockProducer<String, String> producer = new MockProducer<>(
+ Cluster.empty(), false,
+ null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);
OutputCollector collector = mock(OutputCollector.class);
@@ -95,7 +98,9 @@ public class KafkaBoltTest {
@Test
public void testSimpleWithError() {
- MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ MockProducer<String, String> producer = new MockProducer<>(
+ Cluster.empty(), false,
+ null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);
OutputCollector collector = mock(OutputCollector.class);
@@ -126,7 +131,9 @@ public class KafkaBoltTest {
@Test
public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
- MockProducer<String, String> producer = new
MockProducer<>(Cluster.empty(), false, null, null, null);
+ MockProducer<String, String> producer = new MockProducer<>(
+ Cluster.empty(), false,
+ null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);
PreparableCallback customCallback = mock(PreparableCallback.class);
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 6d520304c..e6dee2e1c 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -39,6 +39,7 @@ public class KafkaSpoutNullTupleTest extends
KafkaSpoutAbstractTest {
KafkaSpoutConfig<String, String> createSpoutConfig() {
return KafkaSpoutConfig.builder("127.0.0.1:" +
kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
+ .setGroupId("test")
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setRecordTranslator(new NullRecordTranslator<>())
.build();
diff --git a/pom.xml b/pom.xml
index 5fba628ff..0ea3d7221 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
<jackson.version>2.15.2</jackson.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
- <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
+ <storm.kafka.client.version>3.6.0</storm.kafka.client.version>
<testcontainers.version>1.19.1</testcontainers.version>
<!-- Java and clojure build lifecycle test properties are defined here
to avoid having to create a default profile -->