This is an automated email from the ASF dual-hosted git repository.
apupier pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f8d84aa08a30 CAMEL-23121 - Configure Kafka strimzi for transaction
support in inetgration tests for Kafka 4.x
f8d84aa08a30 is described below
commit f8d84aa08a3040f27a80d2aaaa161109e100d21a
Author: Aurélien Pupier <[email protected]>
AuthorDate: Thu Mar 12 14:24:35 2026 +0100
CAMEL-23121 - Configure Kafka strimzi for transaction support in
inetgration tests for Kafka 4.x
Problem Explanation
The Kafka transaction tests were failing with Strimzi container after
upgrading to Kafka 4 due to a network exception in the
AddPartitionsToTxnManager. The error occurred because:
Missing inter-broker listener configuration: The StrimziContainer didn't
have a dedicated BROKER listener for internal broker-to-broker
communication
Transaction manager connection failure: When Kafka transactions tried to
add partitions, the AddPartitionsToTxnManager attempted to connect to
the broker using the inter-broker listener, but it defaulted to
PLAINTEXT which was configured for external access only
Port mismatch: The transaction manager tried connecting to
localhost:43347 (a random port) instead of the correct internal listener
Solution Applied
Updated StrimziContainer.java to match the working configuration pattern
from ConfluentContainer.java:
Changes Made:
Added dedicated BROKER listener (line 50):
Changed from: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
Changed to:
PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
Updated listener security protocol map (line 52):
Added BROKER:PLAINTEXT mapping
Added inter-broker listener name (line 53):
New: KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
Updated controller quorum voters (line 54):
Changed from: 1@localhost:9093
Changed to: 1@localhost:9094 (controller now on separate port)
Updated advertised listeners (line 89):
Added BROKER listener: BROKER://localhost:9093
Added inter.broker.listener.name override (line 67):
Ensures the configuration is properly passed to Kafka server
This fix ensures that internal broker communication (including
transaction coordination) uses the dedicated BROKER listener on
localhost:9093, while external clients connect via PLAINTEXT on the
mapped port. The configuration now properly supports Kafka 4.x
transactions in KRaft mode.
co-authored: IBM Bob IDE 1.0.0
Signed-off-by: Aurélien Pupier <[email protected]>
---
.../component/kafka/integration/KafkaWithDBTransactionIT.java | 7 -------
.../camel/test/infra/kafka/services/StrimziContainer.java | 10 ++++++----
2 files changed, 6 insertions(+), 11 deletions(-)
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
index 16d405cd9902..bcb39ae3921e 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -166,8 +165,6 @@ public class KafkaWithDBTransactionIT extends
BaseKafkaTestSupport {
*/
@ParameterizedTest
@ValueSource(strings = { "transacted=true", "transactionalId=my-foo1",
"additionalProperties[transactional.id]=my-foo2" })
- @DisabledIfSystemProperty(named = "kafka.instance.type", matches =
"local-strimzi-container",
- disabledReason = "The test is blocked
indefinitely.")
public void transactionProducerWithDBLast(String txParam) throws Exception
{
String startEndpoint = "direct:startTxDBLast";
contextExtension.getContext().addRoutes(new RouteBuilder() {
@@ -203,8 +200,6 @@ public class KafkaWithDBTransactionIT extends
BaseKafkaTestSupport {
* the other with no transactions.
*/
@Test
- @DisabledIfSystemProperty(named = "kafka.instance.type", matches =
"local-strimzi-container",
- disabledReason = "The test is blocked
indefinitely.")
public void transactionMultipleProducersWithDBLast() throws Exception {
contextExtension.getContext().addRoutes(new RouteBuilder() {
public void configure() {
@@ -266,8 +261,6 @@ public class KafkaWithDBTransactionIT extends
BaseKafkaTestSupport {
*/
@ParameterizedTest
@ValueSource(strings = { "transacted=true", "transactionalId=my-bar1",
"additionalProperties[transactional.id]=my-bar2" })
- @DisabledIfSystemProperty(named = "kafka.instance.type", matches =
"local-strimzi-container",
- disabledReason = "The test is blocked
indefinitely.")
public void transactionProducerWithDBFirst(String txParam) throws
Exception {
String startEndpoint = "direct:startTxDBFirst";
contextExtension.getContext().addRoutes(new RouteBuilder() {
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
index 0961227681c3..6d39216fbbc6 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
@@ -47,10 +47,11 @@ public class StrimziContainer extends
GenericContainer<StrimziContainer> {
.withExposedPorts(KAFKA_PORT)
.withEnv("KAFKA_NODE_ID", "1")
.withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
- .withEnv("KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093")
+ .withEnv("KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
.withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
- .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
- .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+ .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9094")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
@@ -63,6 +64,7 @@ public class StrimziContainer extends
GenericContainer<StrimziContainer> {
+ "--override
listeners=${KAFKA_LISTENERS} "
+ "--override
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
+ "--override
listener.security.protocol.map=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} "
+ + "--override
inter.broker.listener.name=${KAFKA_INTER_BROKER_LISTENER_NAME} "
+ "--override
controller.listener.names=${KAFKA_CONTROLLER_LISTENER_NAMES} "
+ "--override
controller.quorum.voters=${KAFKA_CONTROLLER_QUORUM_VOTERS} "
+ "--override
node.id=${KAFKA_NODE_ID} "
@@ -85,7 +87,7 @@ public class StrimziContainer extends
GenericContainer<StrimziContainer> {
@Override
public void start() {
int hostPort = resolveHostPort();
- withEnv("KAFKA_ADVERTISED_LISTENERS",
String.format("PLAINTEXT://%s:%d", getHost(), hostPort));
+ withEnv("KAFKA_ADVERTISED_LISTENERS",
String.format("PLAINTEXT://%s:%d,BROKER://localhost:9093", getHost(),
hostPort));
super.start();
}