This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c4f620 IGNITE-25203 Update Kafka dependency (#324)
a5c4f620 is described below

commit a5c4f62086f14e31a2defc4535112e6e2ae857e0
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Oct 17 16:59:19 2025 +0500

    IGNITE-25203 Update Kafka dependency (#324)
    
    bump to 3.9.1 version
---
 modules/cdc-ext/pom.xml                                  | 10 +++++++++-
 modules/kafka-ext/pom.xml                                |  2 +-
 .../org/apache/ignite/stream/kafka/TestKafkaBroker.java  |  4 ++--
 .../stream/kafka/connect/IgniteSinkConnectorTest.java    |  9 ++++-----
 .../stream/kafka/connect/IgniteSourceConnectorTest.java  | 16 ++++++++++------
 5 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index 014db80c..e12e18ad 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -32,7 +32,7 @@
     </parent>
 
     <properties>
-        <kafka.version>3.4.0</kafka.version>
+        <kafka.version>3.9.1</kafka.version>
     </properties>
 
     <artifactId>ignite-cdc-ext</artifactId>
@@ -152,6 +152,14 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-server-common</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml
index f550a291..7843ff57 100644
--- a/modules/kafka-ext/pom.xml
+++ b/modules/kafka-ext/pom.xml
@@ -32,7 +32,7 @@
     </parent>
 
     <properties>
-        <kafka.version>3.4.0</kafka.version>
+        <kafka.version>3.9.1</kafka.version>
     </properties>
 
     <artifactId>ignite-kafka-ext</artifactId>
diff --git 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 7d45f29b..f0461bc0 100644
--- 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -38,7 +38,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 
 /**
  * Kafka Test Broker.
@@ -78,7 +78,7 @@ public class TestKafkaBroker {
         try {
             zkServer = new TestingServer(ZK_PORT, true);
             kafkaCfg = new KafkaConfig(getKafkaConfig());
-            kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
+            kafkaSrv = TestUtils.createServer(kafkaCfg, Time.SYSTEM);
 
             kafkaSrv.startup();
         }
diff --git 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 69231960..a7759d5f 100644
--- 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -35,8 +35,7 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -106,7 +105,7 @@ public class IgniteSinkConnectorTest extends 
GridCommonAbstractTest {
         AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc
             = new AllConnectorClientConfigOverridePolicy();
 
-        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), 
workerCfg, offBackingStore,
+        worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props), 
workerCfg, offBackingStore,
             allConnectorClientCfgOverridePlc);
         worker.start();
 
@@ -145,7 +144,7 @@ public class IgniteSinkConnectorTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testSinkPutsWithoutTransformation() throws Exception {
-        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+        Map<String, String> sinkProps = makeSinkProps(String.join(",", 
TOPICS));
 
         sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
 
@@ -157,7 +156,7 @@ public class IgniteSinkConnectorTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testSinkPutsWithTransformation() throws Exception {
-        testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+        testSinkPuts(makeSinkProps(String.join(",", TOPICS)), true);
     }
 
     /**
diff --git 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index 858c369a..acc9f4ff 100644
--- 
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ 
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,8 +42,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -103,13 +103,17 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
         Map<String, String> props = makeWorkerProps();
         WorkerConfig workerCfg = new StandaloneConfig(props);
 
-        MemoryOffsetBackingStore offBackingStore = new 
MemoryOffsetBackingStore();
+        MemoryOffsetBackingStore offBackingStore = new 
MemoryOffsetBackingStore() {
+            @Override public Set<Map<String, Object>> 
connectorPartitions(String s) {
+                return Set.of();
+            }
+        };
         offBackingStore.configure(workerCfg);
 
         AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc
             = new AllConnectorClientConfigOverridePolicy();
 
-        worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), 
workerCfg, offBackingStore,
+        worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props), 
workerCfg, offBackingStore,
             allConnectorClientCfgOverridePlc);
         worker.start();
 
@@ -142,7 +146,7 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
-        Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, 
","));
+        Map<String, String> srcProps = makeSourceProps(String.join(",", 
TOPICS));
 
         srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
 
@@ -156,7 +160,7 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testEventsInjectedIntoKafka() throws Exception {
-        doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+        doTest(makeSourceProps(String.join(",", TOPICS)), true);
     }
 
     /**

Reply via email to