This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b9fd6d30b246 CAMEL-23121 - Kafka tests are failing on Jenkins CI since
upgrade to Kafka 4.2 (#21692)
b9fd6d30b246 is described below
commit b9fd6d30b246ab38558f65a324ceff48e3c99675
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Mar 3 11:26:32 2026 +0100
CAMEL-23121 - Kafka tests are failing on Jenkins CI since upgrade to Kafka
4.2 (#21692)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../src/test/resources/log4j2.properties | 3 +++
.../infra/kafka/services/ConfluentContainer.java | 30 +++++++++++++++++++---
.../infra/kafka/services/StrimziContainer.java | 27 ++++++++++++++++---
3 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties
b/components/camel-kafka/src/test/resources/log4j2.properties
index f3afe12f0fa0..452b67149cc4 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -27,3 +27,6 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p
%-30.30c{1} - %m%n
rootLogger.level = INFO
rootLogger.appenderRef.out.ref = out
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = WARN
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
index 95aa14e095fc..37da65ecad66 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
@@ -17,11 +17,12 @@
package org.apache.camel.test.infra.kafka.services;
+import java.io.IOException;
+import java.net.ServerSocket;
import java.util.UUID;
import com.github.dockerjava.api.command.CreateContainerCmd;
import org.apache.camel.test.infra.common.LocalPropertyResolver;
-import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
import org.apache.camel.test.infra.kafka.common.KafkaProperties;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -45,8 +46,7 @@ public class ConfluentContainer extends
GenericContainer<ConfluentContainer> {
.withEnv("KAFKA_BROKER_ID", "1")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
- .withEnv("KAFKA_ADVERTISED_LISTENERS",
- String.format("PLAINTEXT://%s:9092,BROKER://%s:9093",
getHost(), getHost()))
+ // KAFKA_ADVERTISED_LISTENERS is set dynamically in start()
with the correct mapped port
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
@@ -86,7 +86,29 @@ public class ConfluentContainer extends
GenericContainer<ConfluentContainer> {
@Override
public void start() {
- ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT);
+ int hostPort = resolveHostPort();
+ withEnv("KAFKA_ADVERTISED_LISTENERS",
+ String.format("PLAINTEXT://%s:%d,BROKER://localhost:9093",
getHost(), hostPort));
super.start();
}
+
+ private int resolveHostPort() {
+ String suffix = ":" + KAFKA_PORT;
+ for (String binding : getPortBindings()) {
+ if (binding.endsWith(suffix)) {
+ return Integer.parseInt(binding.substring(0,
binding.indexOf(':')));
+ }
+ }
+ int port = findFreePort();
+ addFixedExposedPort(port, KAFKA_PORT);
+ return port;
+ }
+
+ private static int findFreePort() {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find a free port", e);
+ }
+ }
}
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
index d1a0ebff7db4..6575d873b1fa 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java
@@ -17,11 +17,12 @@
package org.apache.camel.test.infra.kafka.services;
+import java.io.IOException;
+import java.net.ServerSocket;
import java.util.UUID;
import com.github.dockerjava.api.command.CreateContainerCmd;
import org.apache.camel.test.infra.common.LocalPropertyResolver;
-import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
import org.apache.camel.test.infra.kafka.common.KafkaProperties;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -47,7 +48,6 @@ public class StrimziContainer extends
GenericContainer<StrimziContainer> {
.withEnv("KAFKA_NODE_ID", "1")
.withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
.withEnv("KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093")
- .withEnv("KAFKA_ADVERTISED_LISTENERS",
String.format("PLAINTEXT://%s:9092", getHost()))
.withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
@@ -84,7 +84,28 @@ public class StrimziContainer extends
GenericContainer<StrimziContainer> {
@Override
public void start() {
- ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT);
+ int hostPort = resolveHostPort();
+ withEnv("KAFKA_ADVERTISED_LISTENERS",
String.format("PLAINTEXT://%s:%d", getHost(), hostPort));
super.start();
}
+
+ private int resolveHostPort() {
+ String suffix = ":" + KAFKA_PORT;
+ for (String binding : getPortBindings()) {
+ if (binding.endsWith(suffix)) {
+ return Integer.parseInt(binding.substring(0,
binding.indexOf(':')));
+ }
+ }
+ int port = findFreePort();
+ addFixedExposedPort(port, KAFKA_PORT);
+ return port;
+ }
+
+ private static int findFreePort() {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find a free port", e);
+ }
+ }
}