This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 863550541 STORM-4004 - Upgrade Kafka Clients to 3.6.0 (#3604)
863550541 is described below

commit 863550541b7d446720d0278cf1f4d5999375dd2c
Author: Richard Zowalla <[email protected]>
AuthorDate: Mon Dec 4 12:43:26 2023 +0100

    STORM-4004 - Upgrade Kafka Clients to 3.6.0 (#3604)
    
    * STORM-4004 - Upgrade Kafka Clients to 3.6.0
    
    * STORM-4004 - Upgrade Kafka Clients to 3.6.0
    
    * STORM-4004 - Fix license
---
 DEPENDENCY-LICENSES                                     |  7 +++++--
 LICENSE-binary                                          |  5 +++--
 examples/storm-kafka-client-examples/pom.xml            |  4 ++++
 external/storm-kafka-client/pom.xml                     |  4 ++--
 .../org/apache/storm/kafka/spout/KafkaSpoutConfig.java  | 17 +++++++++++++++++
 .../src/test/java/org/apache/storm/kafka/KafkaUnit.java |  5 ++---
 .../java/org/apache/storm/kafka/bolt/KafkaBoltTest.java | 13 ++++++++++---
 .../storm/kafka/spout/KafkaSpoutNullTupleTest.java      |  1 +
 pom.xml                                                 |  2 +-
 9 files changed, 45 insertions(+), 13 deletions(-)

diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index ffea76d71..c799bd2f9 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -130,7 +130,7 @@ List of third-party dependencies grouped by their license 
type.
         * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - 
http://hc.apache.org/httpcomponents-core-ga)
         * Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 - 
http://hc.apache.org/httpcomponents-core-ga)
         * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
-        * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - 
http://kafka.apache.org)
+        * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - 
https://kafka.apache.org)
         * Apache Maven Artifact Transfer 
(org.apache.maven.shared:maven-artifact-transfer:0.9.1 - 
https://maven.apache.org/shared/maven-artifact-transfer/)
         * Apache Maven Common Artifact Filters 
(org.apache.maven.shared:maven-common-artifact-filters:3.0.1 - 
https://maven.apache.org/shared/maven-common-artifact-filters/)
         * Apache Maven Dependency Tree 
(org.apache.maven.shared:maven-dependency-tree:2.2 - 
http://maven.apache.org/shared/maven-dependency-tree/)
@@ -319,7 +319,6 @@ List of third-party dependencies grouped by their license 
type.
         * Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 - 
https://lucene.apache.org/lucene-parent/lucene-sandbox)
         * Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 - 
https://lucene.apache.org/lucene-parent/lucene-spatial3d)
         * Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 - 
https://lucene.apache.org/lucene-parent/lucene-suggest)
-        * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - 
https://github.com/jpountz/lz4-java)
         * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - 
https://github.com/lz4/lz4-java)
         * Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 - 
http://maven.apache.org/maven-aether-provider/)
         * Maven Artifact (org.apache.maven:maven-artifact:3.0 - 
http://maven.apache.org/maven-artifact/)
@@ -516,6 +515,10 @@ List of third-party dependencies grouped by their license 
type.
 
         * dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
 
+    BSD 2-Clause License
+
+        * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - 
https://github.com/luben/zstd-jni)
+
     BSD-3-Clause
 
         * asm (org.ow2.asm:asm:9.3 - http://asm.ow2.io/)
diff --git a/LICENSE-binary b/LICENSE-binary
index 3d10d5dac..2ceb9202d 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -737,7 +737,7 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - 
http://hc.apache.org/httpcomponents-client-ga)
         * Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - 
http://hc.apache.org/httpcomponents-core-ga)
         * Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
-        * Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - 
http://kafka.apache.org)
+        * Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - 
https://kafka.apache.org)
         * Apache Log4j 1.x Compatibility API 
(org.apache.logging.log4j:log4j-1.2-api:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-1.2-api/)
         * Apache Log4j API (org.apache.logging.log4j:log4j-api:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-api/)
         * Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - 
https://logging.apache.org/log4j/2.x/log4j-core/)
@@ -886,7 +886,7 @@ The license texts of these dependencies can be found in the 
licenses directory.
         * Kerby PKIX Project (org.apache.kerby:kerby-pkix:1.0.1 - 
http://directory.apache.org/kerby/kerby-pkix)
         * Kerby Util (org.apache.kerby:kerby-util:1.0.1 - 
http://directory.apache.org/kerby/kerby-common/kerby-util)
         * Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - 
http://directory.apache.org/kerby/kerby-common/kerby-xdr)
-        * LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - 
https://github.com/jpountz/lz4-java)
+        * LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - 
https://github.com/lz4/lz4-java)
         * Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - 
https://maven.apache.org/ref/3.6.0/maven-artifact/)
         * Maven Artifact Resolver API 
(org.apache.maven.resolver:maven-resolver-api:1.3.3 - 
https://maven.apache.org/resolver/maven-resolver-api/)
         * Maven Artifact Resolver Connector Basic 
(org.apache.maven.resolver:maven-resolver-connector-basic:1.3.3 - 
https://maven.apache.org/resolver/maven-resolver-connector-basic/)
@@ -1030,6 +1030,7 @@ The license texts of these dependencies can be found in 
the licenses directory.
     BSD 2-Clause license
 
         * dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
+        * zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - 
https://github.com/luben/zstd-jni)
 
     BSD-3-Clause
 
diff --git a/examples/storm-kafka-client-examples/pom.xml 
b/examples/storm-kafka-client-examples/pom.xml
index 6d3273642..61abc1e2c 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -30,6 +30,10 @@
     <artifactId>storm-kafka-client-examples</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-client</artifactId>
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 36a4055d7..2b70874ca 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -110,7 +110,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_2.13</artifactId>
             <version>${storm.kafka.client.version}</version>
             <classifier>test</classifier>
             <scope>test</scope>
@@ -130,7 +130,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_2.13</artifactId>
             <version>${storm.kafka.client.version}</version>
             <scope>test</scope>
             <exclusions>
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 18010378d..cc2efc475 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -82,6 +82,7 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         this.processingGuarantee = builder.processingGuarantee;
         this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
         this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
+        this.setConsumerGroupId(builder.groupId);
     }
 
     /**
@@ -122,6 +123,7 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         private ProcessingGuarantee processingGuarantee = 
DEFAULT_PROCESSING_GUARANTEE;
         private boolean tupleTrackingEnforced = false;
         private int metricsTimeBucketSizeInSecs = 
DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+        private String groupId;
 
         public Builder(String bootstrapServers, String... topics) {
             super(bootstrapServers, topics);
@@ -160,6 +162,15 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
             return this;
         }
 
+        /**
+         * Specifies the group id.
+         * @param groupId the group id
+         */
+        public Builder<K, V> setGroupId(String groupId) {
+            this.groupId = groupId;
+            return this;
+        }
+
         /**
          * Defines the max number of polled offsets (records) that can be 
pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled 
until the next successful commit(s) sets the number
@@ -348,6 +359,12 @@ public class KafkaSpoutConfig<K, V> extends 
CommonKafkaSpoutConfig<K, V> {
         return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
     }
 
+    public void setConsumerGroupId(String groupId) {
+        if (groupId != null) {
+            getKafkaProps().put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        }
+    }
+
     public int getMaxUncommittedOffsets() {
         return maxUncommittedOffsets;
     }
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 97d358db2..50dbdf84a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -37,6 +36,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.storm.testing.TmpPath;
 
 public class KafkaUnit {
@@ -64,8 +64,7 @@ public class KafkaUnit {
         brokerProps.setProperty("listeners", 
String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
         brokerProps.setProperty("offsets.topic.replication.factor", "1");
         KafkaConfig config = new KafkaConfig(brokerProps);
-        MockTime mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
+        kafkaServer = TestUtils.createServer(config, new MockTime());
 
         // setup default Producer
         createProducer();
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 0b0a64ed3..63b98eb6a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -66,7 +67,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testSimple() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         OutputCollector collector = mock(OutputCollector.class);
@@ -95,7 +98,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testSimpleWithError() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         OutputCollector collector = mock(OutputCollector.class);
@@ -126,7 +131,9 @@ public class KafkaBoltTest {
 
     @Test
     public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
-        MockProducer<String, String> producer = new 
MockProducer<>(Cluster.empty(), false, null, null, null);
+        MockProducer<String, String> producer = new MockProducer<>(
+                Cluster.empty(), false,
+                null, new StringSerializer(), new StringSerializer());
         KafkaBolt<String, String> bolt = makeBolt(producer);
 
         PreparableCallback customCallback = mock(PreparableCallback.class);
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
index 6d520304c..e6dee2e1c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java
@@ -39,6 +39,7 @@ public class KafkaSpoutNullTupleTest extends 
KafkaSpoutAbstractTest {
     KafkaSpoutConfig<String, String> createSpoutConfig() {
         return KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
                 Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
+                .setGroupId("test")
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
                 .setRecordTranslator(new NullRecordTranslator<>())
                 .build();
diff --git a/pom.xml b/pom.xml
index 5fba628ff..0ea3d7221 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
 
         <jackson.version>2.15.2</jackson.version>
         <jackson.databind.version>2.15.2</jackson.databind.version>
-        <storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
+        <storm.kafka.client.version>3.6.0</storm.kafka.client.version>
         <testcontainers.version>1.19.1</testcontainers.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here 
to avoid having to create a default profile -->

Reply via email to