This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new d3a02cbfd8 fix(#3807): Update kafka version to 4.1.0 (#3808)
d3a02cbfd8 is described below
commit d3a02cbfd86751bd7549adec7bbf4d0994fb45e4
Author: Philipp Zehnder <[email protected]>
AuthorDate: Thu Oct 2 13:14:45 2025 +0200
fix(#3807): Update kafka version to 4.1.0 (#3808)
* fix(#3807): Update kafka version to 4.1.0
* fix(#3807): Fix kafka integration test
---
docker-compose.yml | 30 ++++++++++++---------
.../deploy/standalone/kafka/docker-compose.dev.yml | 28 ++++++++++---------
.../cli/deploy/standalone/kafka/docker-compose.yml | 31 ++++++++++++----------
installer/compose/docker-compose.full.yml | 30 ++++++++++++---------
installer/compose/docker-compose.yml | 30 ++++++++++++---------
streampipes-integration-tests/pom.xml | 4 +--
.../integration/adapters/KafkaAdapterTester.java | 8 +++---
.../integration/containers/KafkaDevContainer.java | 2 +-
...fkaContainer.java => SpKafkaTestContainer.java} | 29 +++++++++-----------
.../kafka/config/ConsumerConfigFactory.java | 1 +
10 files changed, 103 insertions(+), 90 deletions(-)
diff --git a/docker-compose.yml b/docker-compose.yml
index 6135ce4f3e..a8659193b1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -81,22 +81,26 @@ services:
spnet:
kafka:
- image: bitnami/kafka:3.9.0
+ image: apache/kafka:4.1.0
hostname: kafka
environment:
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
- -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
- -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_NODE_ID=0
+ - KAFKA_PROCESS_ROLES=controller,broker
+ - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+ - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+ - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+ - KAFKA_DEFAULT_REPLICATION_FACTOR=1
volumes:
- - kafka3:/bitnami
+ - kafka3:/var/lib/kafka/data
logging: *default-logging
networks:
spnet:
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
index 8ac1f2e8ba..0f43f788dc 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
@@ -18,17 +18,19 @@ services:
ports:
- "9094:9094"
environment:
- # KRaft settings
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- # Listeners
- -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
- -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
- -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_NODE_ID=0
+ - KAFKA_PROCESS_ROLES=controller,broker
+ - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+ - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+ - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+ - KAFKA_DEFAULT_REPLICATION_FACTOR=1
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml
b/installer/cli/deploy/standalone/kafka/docker-compose.yml
index a4d689d2e1..f4a28f8c9c 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml
@@ -15,23 +15,26 @@
services:
kafka:
- image: bitnami/kafka:3.9.0
+ image: apache/kafka:4.1.0
hostname: kafka
environment:
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- # Listeners
- - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_NODE_ID=0
+ - KAFKA_PROCESS_ROLES=controller,broker
+ - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+ - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+ - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+ - KAFKA_DEFAULT_REPLICATION_FACTOR=1
volumes:
- - kafka3:/bitnami
+ - kafka3:/var/lib/kafka/data
logging:
driver: "json-file"
options:
diff --git a/installer/compose/docker-compose.full.yml
b/installer/compose/docker-compose.full.yml
index 65a2e1b652..aaa3a5613e 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -64,22 +64,26 @@ services:
spnet:
kafka:
- image: bitnami/kafka:3.9.0
+ image: apache/kafka:4.1.0
hostname: kafka
environment:
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
- -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
- -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_NODE_ID=0
+ - KAFKA_PROCESS_ROLES=controller,broker
+ - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+ - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+ - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+ - KAFKA_DEFAULT_REPLICATION_FACTOR=1
volumes:
- - kafka3:/bitnami
+ - kafka3:/var/lib/kafka/data
logging: *default-logging
restart: unless-stopped
networks:
diff --git a/installer/compose/docker-compose.yml
b/installer/compose/docker-compose.yml
index 338d138fcc..f0d00599db 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -64,22 +64,26 @@ services:
spnet:
kafka:
- image: bitnami/kafka:3.9.0
+ image: apache/kafka:4.1.0
hostname: kafka
environment:
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
- -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
- -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
- - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_NODE_ID=0
+ - KAFKA_PROCESS_ROLES=controller,broker
+ - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
+ - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
+ - KAFKA_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
+ - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
+ - KAFKA_DEFAULT_REPLICATION_FACTOR=1
volumes:
- - kafka3:/bitnami
+ - kafka3:/var/lib/kafka/data
logging: *default-logging
restart: unless-stopped
networks:
diff --git a/streampipes-integration-tests/pom.xml
b/streampipes-integration-tests/pom.xml
index 55e519b062..c182db6ba0 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -109,13 +109,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
- <version>1.19.0</version>
+ <version>1.21.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
- <version>1.19.0</version>
+ <version>1.21.3</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
index 803b2b1693..fd49dc1728 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java
@@ -23,8 +23,8 @@ import
org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import
org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import
org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
-import org.apache.streampipes.integration.containers.KafkaContainer;
import org.apache.streampipes.integration.containers.KafkaDevContainer;
+import org.apache.streampipes.integration.containers.SpKafkaTestContainer;
import org.apache.streampipes.manager.template.AdapterTemplateHandler;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -44,7 +44,7 @@ import java.util.Objects;
public class KafkaAdapterTester extends AdapterTesterBase {
- KafkaContainer kafkaContainer;
+ SpKafkaTestContainer kafkaContainer;
private static final String TOPIC = "test-topic";
@@ -53,7 +53,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
if (Objects.equals(System.getenv("TEST_MODE"), "dev")) {
kafkaContainer = new KafkaDevContainer();
} else {
- kafkaContainer = new KafkaContainer();
+ kafkaContainer = new SpKafkaTestContainer();
}
kafkaContainer.start();
@@ -144,7 +144,7 @@ public class KafkaAdapterTester extends AdapterTesterBase {
}
@Override
- public void publishEvents(List<Map<String, Object>> events) throws Exception
{
+ public void publishEvents(List<Map<String, Object>> events) {
var publisher = getSpKafkaProducer();
var objectMapper = new ObjectMapper();
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
index fa04d3fc21..0274faa22c 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaDevContainer.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.integration.containers;
-public class KafkaDevContainer extends KafkaContainer{
+public class KafkaDevContainer extends SpKafkaTestContainer {
@Override
public void start() {
// do nothing
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
similarity index 63%
rename from
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
rename to
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
index e9da5ad4a6..8b3ceec972 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/KafkaContainer.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/SpKafkaTestContainer.java
@@ -18,38 +18,33 @@
package org.apache.streampipes.integration.containers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.kafka.KafkaContainer;
-public class KafkaContainer extends
org.testcontainers.containers.KafkaContainer {
+public class SpKafkaTestContainer {
- Logger logger = LoggerFactory.getLogger(KafkaContainer.class);
+ private KafkaContainer kafka;
- private static final int KAFKA_PORT = 9093;
-
-
- public KafkaContainer() {
- super(DockerImageName.parse("confluentinc/cp-kafka:7.9.1"));
+ public SpKafkaTestContainer() {
+ kafka = new KafkaContainer("apache/kafka");
}
-
public void start() {
- super.start();
+ kafka.start();
}
public String getBrokerHost() {
- return getHost();
+ return kafka.getHost();
}
public Integer getBrokerPort() {
- return getMappedPort(KAFKA_PORT);
+ return kafka.getFirstMappedPort();
}
- public String getHttpUrl() {
- return "http://" + getHost() + ":" + getBrokerPort();
+ public void stop() {
+ if (kafka != null) {
+ kafka.stop();
+ }
}
-
}
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 3ab75d03e6..4b16a52b2a 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -43,6 +43,7 @@ public class ConsumerConfigFactory extends
AbstractConfigFactory {
public Properties makeDefaultProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
getConfigOrDefault(protocol::getGroupId, UUID.randomUUID().toString()));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,