This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new a5c4f620 IGNITE-25203 Update Kafka dependency (#324)
a5c4f620 is described below
commit a5c4f62086f14e31a2defc4535112e6e2ae857e0
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Oct 17 16:59:19 2025 +0500
IGNITE-25203 Update Kafka dependency (#324)
bump to 3.9.1 version
---
modules/cdc-ext/pom.xml | 10 +++++++++-
modules/kafka-ext/pom.xml | 2 +-
.../org/apache/ignite/stream/kafka/TestKafkaBroker.java | 4 ++--
.../stream/kafka/connect/IgniteSinkConnectorTest.java | 9 ++++-----
.../stream/kafka/connect/IgniteSourceConnectorTest.java | 16 ++++++++++------
5 files changed, 26 insertions(+), 15 deletions(-)
diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index 014db80c..e12e18ad 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -32,7 +32,7 @@
</parent>
<properties>
- <kafka.version>3.4.0</kafka.version>
+ <kafka.version>3.9.1</kafka.version>
</properties>
<artifactId>ignite-cdc-ext</artifactId>
@@ -152,6 +152,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server-common</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git a/modules/kafka-ext/pom.xml b/modules/kafka-ext/pom.xml
index f550a291..7843ff57 100644
--- a/modules/kafka-ext/pom.xml
+++ b/modules/kafka-ext/pom.xml
@@ -32,7 +32,7 @@
</parent>
<properties>
- <kafka.version>3.4.0</kafka.version>
+ <kafka.version>3.9.1</kafka.version>
</properties>
<artifactId>ignite-kafka-ext</artifactId>
diff --git
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 7d45f29b..f0461bc0 100644
---
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -38,7 +38,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
/**
* Kafka Test Broker.
@@ -78,7 +78,7 @@ public class TestKafkaBroker {
try {
zkServer = new TestingServer(ZK_PORT, true);
kafkaCfg = new KafkaConfig(getKafkaConfig());
- kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
+ kafkaSrv = TestUtils.createServer(kafkaCfg, Time.SYSTEM);
kafkaSrv.startup();
}
diff --git
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 69231960..a7759d5f 100644
---
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -35,8 +35,7 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.kafka.TestKafkaBroker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -106,7 +105,7 @@ public class IgniteSinkConnectorTest extends
GridCommonAbstractTest {
AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc
= new AllConnectorClientConfigOverridePolicy();
- worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props),
workerCfg, offBackingStore,
+ worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props),
workerCfg, offBackingStore,
allConnectorClientCfgOverridePlc);
worker.start();
@@ -145,7 +144,7 @@ public class IgniteSinkConnectorTest extends
GridCommonAbstractTest {
*/
@Test
public void testSinkPutsWithoutTransformation() throws Exception {
- Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+ Map<String, String> sinkProps = makeSinkProps(String.join(",",
TOPICS));
sinkProps.remove(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
@@ -157,7 +156,7 @@ public class IgniteSinkConnectorTest extends
GridCommonAbstractTest {
*/
@Test
public void testSinkPutsWithTransformation() throws Exception {
- testSinkPuts(makeSinkProps(Utils.join(TOPICS, ",")), true);
+ testSinkPuts(makeSinkProps(String.join(",", TOPICS)), true);
}
/**
diff --git
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index 858c369a..acc9f4ff 100644
---
a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++
b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,8 +42,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
@@ -103,13 +103,17 @@ public class IgniteSourceConnectorTest extends
GridCommonAbstractTest {
Map<String, String> props = makeWorkerProps();
WorkerConfig workerCfg = new StandaloneConfig(props);
- MemoryOffsetBackingStore offBackingStore = new
MemoryOffsetBackingStore();
+ MemoryOffsetBackingStore offBackingStore = new
MemoryOffsetBackingStore() {
+ @Override public Set<Map<String, Object>>
connectorPartitions(String s) {
+ return Set.of();
+ }
+ };
offBackingStore.configure(workerCfg);
AllConnectorClientConfigOverridePolicy allConnectorClientCfgOverridePlc
= new AllConnectorClientConfigOverridePolicy();
- worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props),
workerCfg, offBackingStore,
+ worker = new Worker(WORKER_ID, Time.SYSTEM, new Plugins(props),
workerCfg, offBackingStore,
allConnectorClientCfgOverridePlc);
worker.start();
@@ -142,7 +146,7 @@ public class IgniteSourceConnectorTest extends
GridCommonAbstractTest {
*/
@Test
public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
- Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS,
","));
+ Map<String, String> srcProps = makeSourceProps(String.join(",",
TOPICS));
srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
@@ -156,7 +160,7 @@ public class IgniteSourceConnectorTest extends
GridCommonAbstractTest {
*/
@Test
public void testEventsInjectedIntoKafka() throws Exception {
- doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+ doTest(makeSourceProps(String.join(",", TOPICS)), true);
}
/**