This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 23121
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 800f380ec36b98f69853df696036fe35c824b42e
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Mar 3 10:56:23 2026 +0100

    CAMEL-23121 - Kafka tests are failing on Jenkins CI since upgrade to Kafka 
4.2
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../src/test/resources/log4j2.properties           |  3 +++
 .../infra/kafka/services/ConfluentContainer.java   | 30 +++++++++++++++++++---
 .../infra/kafka/services/StrimziContainer.java     | 27 ++++++++++++++++---
 3 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/components/camel-kafka/src/test/resources/log4j2.properties 
b/components/camel-kafka/src/test/resources/log4j2.properties
index f3afe12f0fa0..452b67149cc4 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -27,3 +27,6 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p 
%-30.30c{1} - %m%n
 
 rootLogger.level = INFO
 rootLogger.appenderRef.out.ref = out
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = WARN
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
index 95aa14e095fc..37da65ecad66 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
@@ -17,11 +17,12 @@
 
 package org.apache.camel.test.infra.kafka.services;
 
+import java.io.IOException;
+import java.net.ServerSocket;
 import java.util.UUID;
 
 import com.github.dockerjava.api.command.CreateContainerCmd;
 import org.apache.camel.test.infra.common.LocalPropertyResolver;
-import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
 import org.apache.camel.test.infra.kafka.common.KafkaProperties;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -45,8 +46,7 @@ public class ConfluentContainer extends 
GenericContainer<ConfluentContainer> {
                 .withEnv("KAFKA_BROKER_ID", "1")
                 .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
                         
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
-                .withEnv("KAFKA_ADVERTISED_LISTENERS",
-                        String.format("PLAINTEXT://%s:9092,BROKER://%s:9093", 
getHost(), getHost()))
+                // KAFKA_ADVERTISED_LISTENERS is set dynamically in start() 
with the correct mapped port
                 .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
@@ -86,7 +86,29 @@ public class ConfluentContainer extends 
GenericContainer<ConfluentContainer> {
 
     @Override
     public void start() {
-        ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT);
+        int hostPort = resolveHostPort();
+        withEnv("KAFKA_ADVERTISED_LISTENERS",
+                String.format("PLAINTEXT://%s:%d,BROKER://localhost:9093", 
getHost(), hostPort));
         super.start();
     }
+
+    private int resolveHostPort() {
+        String suffix = ":" + KAFKA_PORT;
+        for (String binding : getPortBindings()) {
+            if (binding.endsWith(suffix)) {
+                return Integer.parseInt(binding.substring(0, 
binding.indexOf(':')));
+            }
+        }
+        int port = findFreePort();
+        addFixedExposedPort(port, KAFKA_PORT);
+        return port;
+    }
+
+    private static int findFreePort() {
+        try (ServerSocket socket = new ServerSocket(0)) {
+            return socket.getLocalPort();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find a free port", e);
+        }
+    }
 }
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
index d1a0ebff7db4..6575d873b1fa 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
@@ -17,11 +17,12 @@
 
 package org.apache.camel.test.infra.kafka.services;
 
+import java.io.IOException;
+import java.net.ServerSocket;
 import java.util.UUID;
 
 import com.github.dockerjava.api.command.CreateContainerCmd;
 import org.apache.camel.test.infra.common.LocalPropertyResolver;
-import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
 import org.apache.camel.test.infra.kafka.common.KafkaProperties;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -47,7 +48,6 @@ public class StrimziContainer extends 
GenericContainer<StrimziContainer> {
                 .withEnv("KAFKA_NODE_ID", "1")
                 .withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
                 .withEnv("KAFKA_LISTENERS", 
"PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093")
-                .withEnv("KAFKA_ADVERTISED_LISTENERS", 
String.format("PLAINTEXT://%s:9092", getHost()))
                 .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
                 .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", 
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
                 .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
@@ -84,7 +84,28 @@ public class StrimziContainer extends 
GenericContainer<StrimziContainer> {
 
     @Override
     public void start() {
-        ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT);
+        int hostPort = resolveHostPort();
+        withEnv("KAFKA_ADVERTISED_LISTENERS", 
String.format("PLAINTEXT://%s:%d", getHost(), hostPort));
         super.start();
     }
+
+    private int resolveHostPort() {
+        String suffix = ":" + KAFKA_PORT;
+        for (String binding : getPortBindings()) {
+            if (binding.endsWith(suffix)) {
+                return Integer.parseInt(binding.substring(0, 
binding.indexOf(':')));
+            }
+        }
+        int port = findFreePort();
+        addFixedExposedPort(port, KAFKA_PORT);
+        return port;
+    }
+
+    private static int findFreePort() {
+        try (ServerSocket socket = new ServerSocket(0)) {
+            return socket.getLocalPort();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find a free port", e);
+        }
+    }
 }

Reply via email to