This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new d3a02cbfd8 fix(#3807): Update kafka version to 4.1.0 (#3808)
d3a02cbfd8 is described below

commit d3a02cbfd86751bd7549adec7bbf4d0994fb45e4
Author: Philipp Zehnder <[email protected]>
AuthorDate: Thu Oct 2 13:14:45 2025 +0200

    fix(#3807): Update kafka version to 4.1.0 (#3808)
    
    * fix(#3807): Update kafka version to 4.1.0
    
    * fix(#3807): Fix kafka integration test
---
 docker-compose.yml                                 | 30 ++++++++++++---------
 .../deploy/standalone/kafka/docker-compose.dev.yml | 28 ++++++++++---------
 .../cli/deploy/standalone/kafka/docker-compose.yml | 31 ++++++++++++----------
 installer/compose/docker-compose.full.yml          | 30 ++++++++++++---------
 installer/compose/docker-compose.yml               | 30 ++++++++++++---------
 streampipes-integration-tests/pom.xml              |  4 +--
 .../integration/adapters/KafkaAdapterTester.java   |  8 +++---
 .../integration/containers/KafkaDevContainer.java  |  2 +-
 ...fkaContainer.java => SpKafkaTestContainer.java} | 29 +++++++++-----------
 .../kafka/config/ConsumerConfigFactory.java        |  1 +
 10 files changed, 103 insertions(+), 90 deletions(-)

diff --git a/docker-compose.yml b/docker-compose.yml
index 6135ce4f3e..a8659193b1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -81,22 +81,26 @@ services:
       spnet:
 
   kafka:
-    image: bitnami/kafka:3.9.0
+    image: apache/kafka:4.1.0
     hostname: kafka
     environment:
-      - KAFKA_CFG_NODE_ID=0
-      - KAFKA_CFG_PROCESS_ROLES=controller,broker
-      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
-      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
-      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
-      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
-      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
-      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_NODE_ID=0
+      - KAFKA_PROCESS_ROLES=controller,broker
+      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
     volumes:
-      - kafka3:/bitnami
+      - kafka3:/var/lib/kafka/data
     logging: *default-logging
     networks:
       spnet:
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml 
b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
index 8ac1f2e8ba..0f43f788dc 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
@@ -18,17 +18,19 @@ services:
     ports:
       - "9094:9094"
     environment:
-      # KRaft settings
-      - KAFKA_CFG_NODE_ID=0
-      - KAFKA_CFG_PROCESS_ROLES=controller,broker
-      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
-      # Listeners
-      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
-      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
-      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
-      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
-      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_NODE_ID=0
+      - KAFKA_PROCESS_ROLES=controller,broker
+      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
 
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml 
b/installer/cli/deploy/standalone/kafka/docker-compose.yml
index a4d689d2e1..f4a28f8c9c 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml
@@ -15,23 +15,26 @@
 
 services:
   kafka:
-    image: bitnami/kafka:3.9.0
+    image: apache/kafka:4.1.0
     hostname: kafka
     environment:
-      - KAFKA_CFG_NODE_ID=0
-      - KAFKA_CFG_PROCESS_ROLES=controller,broker
-      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
-      # Listeners
-      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
-      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
-      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
-      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
-      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
-      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_NODE_ID=0
+      - KAFKA_PROCESS_ROLES=controller,broker
+      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
     volumes:
-      - kafka3:/bitnami
+      - kafka3:/var/lib/kafka/data
     logging:
       driver: "json-file"
       options:
diff --git a/installer/compose/docker-compose.full.yml 
b/installer/compose/docker-compose.full.yml
index 65a2e1b652..aaa3a5613e 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -64,22 +64,26 @@ services:
       spnet:
 
   kafka:
-    image: bitnami/kafka:3.9.0
+    image: apache/kafka:4.1.0
     hostname: kafka
     environment:
-      - KAFKA_CFG_NODE_ID=0
-      - KAFKA_CFG_PROCESS_ROLES=controller,broker
-      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
-      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
-      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
-      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
-      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
-      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_NODE_ID=0
+      - KAFKA_PROCESS_ROLES=controller,broker
+      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
     volumes:
-      - kafka3:/bitnami
+      - kafka3:/var/lib/kafka/data
     logging: *default-logging
     restart: unless-stopped
     networks:
diff --git a/installer/compose/docker-compose.yml 
b/installer/compose/docker-compose.yml
index 338d138fcc..f0d00599db 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -64,22 +64,26 @@ services:
       spnet:
 
   kafka:
-    image: bitnami/kafka:3.9.0
+    image: apache/kafka:4.1.0
     hostname: kafka
     environment:
-      - KAFKA_CFG_NODE_ID=0
-      - KAFKA_CFG_PROCESS_ROLES=controller,broker
-      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
-      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
-      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
-      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
-      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
-      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
-      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_NODE_ID=0
+      - KAFKA_PROCESS_ROLES=controller,broker
+      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+      - KAFKA_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+      - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
     volumes:
-      - kafka3:/bitnami
+      - kafka3:/var/lib/kafka/data
     logging: *default-logging
     restart: unless-stopped
     networks:
diff --git a/streampipes-integration-tests/pom.xml 
b/streampipes-integration-tests/pom.xml
index 55e519b062..c182db6ba0 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -109,13 +109,13 @@
     <dependency>
       <groupId>org.testcontainers</groupId>
       <artifactId>testcontainers</artifactId>
-      <version>1.19.0</version>
+      <version>1.21.3</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.testcontainers</groupId>
       <artifactId>kafka</artifactId>
-      <version>1.19.0</version>
+      <version>1.21.3</version>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index 803b2b1693..fd49dc1728 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -23,8 +23,8 @@ import 
org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
 import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
 import 
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
 import 
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
-import org.apache.streampipes.integration.containers.KafkaContainer;
 import org.apache.streampipes.integration.containers.KafkaDevContainer;
+import org.apache.streampipes.integration.containers.SpKafkaTestContainer;
 import org.apache.streampipes.manager.template.AdapterTemplateHandler;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -44,7 +44,7 @@ import java.util.Objects;
 
 public class KafkaAdapterTester extends AdapterTesterBase {
 
-  KafkaContainer kafkaContainer;
+  SpKafkaTestContainer kafkaContainer;
 
   private static final String TOPIC = "test-topic";
 
@@ -53,7 +53,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
     if (Objects.equals(System.getenv("TEST_MODE"), "dev")) {
       kafkaContainer = new KafkaDevContainer();
     } else {
-      kafkaContainer = new KafkaContainer();
+      kafkaContainer = new SpKafkaTestContainer();
     }
 
     kafkaContainer.start();
@@ -144,7 +144,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
   }
 
   @Override
-  public void publishEvents(List<Map<String, Object>> events) throws Exception 
{
+  public void publishEvents(List<Map<String, Object>> events) {
     var publisher = getSpKafkaProducer();
     var objectMapper = new ObjectMapper();
 
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
index fa04d3fc21..0274faa22c 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.integration.containers;
 
-public class KafkaDevContainer  extends KafkaContainer{
+public class KafkaDevContainer extends SpKafkaTestContainer {
   @Override
   public void start() {
     // do nothing
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
similarity index 63%
rename from 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
rename to 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
index e9da5ad4a6..8b3ceec972 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
@@ -18,38 +18,33 @@
 
 package org.apache.streampipes.integration.containers;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.kafka.KafkaContainer;
 
 
-public class KafkaContainer extends 
org.testcontainers.containers.KafkaContainer {
+public class SpKafkaTestContainer {
 
-  Logger logger = LoggerFactory.getLogger(KafkaContainer.class);
+  private KafkaContainer kafka;
 
-  private static final int KAFKA_PORT = 9093;
-
-
-  public KafkaContainer() {
-    super(DockerImageName.parse("confluentinc/cp-kafka:7.9.1"));
+  public SpKafkaTestContainer() {
+    kafka = new KafkaContainer("apache/kafka");
   }
 
-
   public void start() {
-    super.start();
+    kafka.start();
   }
 
   public String getBrokerHost() {
-    return getHost();
+    return kafka.getHost();
   }
 
   public Integer getBrokerPort() {
-    return getMappedPort(KAFKA_PORT);
+    return kafka.getFirstMappedPort();
   }
 
-  public String getHttpUrl() {
-    return "http://"; + getHost() + ":" + getBrokerPort();
+  public void stop() {
+    if (kafka != null) {
+      kafka.stop();
+    }
   }
 
-
 }
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 3ab75d03e6..4b16a52b2a 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -43,6 +43,7 @@ public class ConsumerConfigFactory extends 
AbstractConfigFactory {
   public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
+    props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
     props.put(ConsumerConfig.GROUP_ID_CONFIG, 
getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,

Reply via email to