This is an automated email from the ASF dual-hosted git repository.

apupier pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f8d84aa08a30 CAMEL-23121 - Configure Kafka strimzi for transaction 
support in inetgration tests for Kafka 4.x
f8d84aa08a30 is described below

commit f8d84aa08a3040f27a80d2aaaa161109e100d21a
Author: AurĂ©lien Pupier <[email protected]>
AuthorDate: Thu Mar 12 14:24:35 2026 +0100

    CAMEL-23121 - Configure Kafka strimzi for transaction support in
    inetgration tests for Kafka 4.x
    
    Problem Explanation
    The Kafka transaction tests were failing with Strimzi container after
    upgrading to Kafka 4 due to a network exception in the
    AddPartitionsToTxnManager. The error occurred because:
    
    Missing inter-broker listener configuration: The StrimziContainer didn't
    have a dedicated BROKER listener for internal broker-to-broker
    communication
    Transaction manager connection failure: When Kafka transactions tried to
    add partitions, the AddPartitionsToTxnManager attempted to connect to
    the broker using the inter-broker listener, but it defaulted to
    PLAINTEXT which was configured for external access only
    Port mismatch: The transaction manager tried connecting to
    localhost:43347 (a random port) instead of the correct internal listener
    Solution Applied
    Updated StrimziContainer.java to match the working configuration pattern
    from ConfluentContainer.java:
    
    Changes Made:
    Added dedicated BROKER listener (line 50):
    
    Changed from: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
    Changed to:
    PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
    Updated listener security protocol map (line 52):
    
    Added BROKER:PLAINTEXT mapping
    Added inter-broker listener name (line 53):
    
    New: KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
    Updated controller quorum voters (line 54):
    
    Changed from: 1@localhost:9093
    Changed to: 1@localhost:9094 (controller now on separate port)
    Updated advertised listeners (line 89):
    
    Added BROKER listener: BROKER://localhost:9093
    Added inter.broker.listener.name override (line 67):
    
    Ensures the configuration is properly passed to Kafka server
    This fix ensures that internal broker communication (including
    transaction coordination) uses the dedicated BROKER listener on
    localhost:9093, while external clients connect via PLAINTEXT on the
    mapped port. The configuration now properly supports Kafka 4.x
    transactions in KRaft mode.
    
    co-authored: IBM Bob IDE 1.0.0
    Signed-off-by: AurĂ©lien Pupier <[email protected]>
---
 .../component/kafka/integration/KafkaWithDBTransactionIT.java  |  7 -------
 .../camel/test/infra/kafka/services/StrimziContainer.java      | 10 ++++++----
 2 files changed, 6 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
index 16d405cd9902..bcb39ae3921e 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaWithDBTransactionIT.java
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.springframework.jdbc.core.JdbcTemplate;
@@ -166,8 +165,6 @@ public class KafkaWithDBTransactionIT extends 
BaseKafkaTestSupport {
      */
     @ParameterizedTest
     @ValueSource(strings = { "transacted=true", "transactionalId=my-foo1", 
"additionalProperties[transactional.id]=my-foo2" })
-    @DisabledIfSystemProperty(named = "kafka.instance.type", matches = 
"local-strimzi-container",
-                              disabledReason = "The test is blocked 
indefinitely.")
     public void transactionProducerWithDBLast(String txParam) throws Exception 
{
         String startEndpoint = "direct:startTxDBLast";
         contextExtension.getContext().addRoutes(new RouteBuilder() {
@@ -203,8 +200,6 @@ public class KafkaWithDBTransactionIT extends 
BaseKafkaTestSupport {
      * the other with no transactions.
      */
     @Test
-    @DisabledIfSystemProperty(named = "kafka.instance.type", matches = 
"local-strimzi-container",
-                              disabledReason = "The test is blocked 
indefinitely.")
     public void transactionMultipleProducersWithDBLast() throws Exception {
         contextExtension.getContext().addRoutes(new RouteBuilder() {
             public void configure() {
@@ -266,8 +261,6 @@ public class KafkaWithDBTransactionIT extends 
BaseKafkaTestSupport {
      */
     @ParameterizedTest
     @ValueSource(strings = { "transacted=true", "transactionalId=my-bar1", 
"additionalProperties[transactional.id]=my-bar2" })
-    @DisabledIfSystemProperty(named = "kafka.instance.type", matches = 
"local-strimzi-container",
-                              disabledReason = "The test is blocked 
indefinitely.")
     public void transactionProducerWithDBFirst(String txParam) throws 
Exception {
         String startEndpoint = "direct:startTxDBFirst";
         contextExtension.getContext().addRoutes(new RouteBuilder() {
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
index 0961227681c3..6d39216fbbc6 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
@@ -47,10 +47,11 @@ public class StrimziContainer extends 
GenericContainer<StrimziContainer> {
                 .withExposedPorts(KAFKA_PORT)
                 .withEnv("KAFKA_NODE_ID", "1")
                 .withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
-                .withEnv("KAFKA_LISTENERS", 
"PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093")
+                .withEnv("KAFKA_LISTENERS", 
"PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
                 .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
-                .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
-                .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
+                .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT")
+                .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+                .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9094")
                 .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
@@ -63,6 +64,7 @@ public class StrimziContainer extends 
GenericContainer<StrimziContainer> {
                                          + "--override 
listeners=${KAFKA_LISTENERS} "
                                          + "--override 
advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} "
                                          + "--override 
listener.security.protocol.map=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} "
+                                         + "--override 
inter.broker.listener.name=${KAFKA_INTER_BROKER_LISTENER_NAME} "
                                          + "--override 
controller.listener.names=${KAFKA_CONTROLLER_LISTENER_NAMES} "
                                          + "--override 
controller.quorum.voters=${KAFKA_CONTROLLER_QUORUM_VOTERS} "
                                          + "--override 
node.id=${KAFKA_NODE_ID} "
@@ -85,7 +87,7 @@ public class StrimziContainer extends 
GenericContainer<StrimziContainer> {
     @Override
     public void start() {
         int hostPort = resolveHostPort();
-        withEnv("KAFKA_ADVERTISED_LISTENERS", 
String.format("PLAINTEXT://%s:%d", getHost(), hostPort));
+        withEnv("KAFKA_ADVERTISED_LISTENERS", 
String.format("PLAINTEXT://%s:%d,BROKER://localhost:9093", getHost(), 
hostPort));
         super.start();
     }
 

Reply via email to