This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5dfdbe9befc3 Replace Thread.sleep with Awaitility in camel-kafka tests
(#24357)
5dfdbe9befc3 is described below
commit 5dfdbe9befc3584e597f91e96be72fa7c2ac72f3
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Jul 2 07:28:48 2026 +0200
Replace Thread.sleep with Awaitility in camel-kafka tests (#24357)
* Replace Thread.sleep with Awaitility in camel-kafka tests
Replace Thread.sleep() calls with Awaitility-based polling in three
camel-kafka test files to improve test reliability and reduce flakiness:
- KafkaProducerSaslAuthTypeIT: Replace sleep-then-poll with
await().untilAsserted() for message consumption verification
- KafkaTransactionIT: Replace manual polling loop with sleep to
await().pollInterval().untilAsserted()
- KafkaBatchingIntervalResetAfterIdleIT: Replace Thread.sleep for idle
period simulation with await().pollDelay().untilAsserted()
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Address review feedback: order-independent assertions and coupled timeouts
- Sort received records by key before asserting to avoid relying on
cross-partition delivery order (KafkaProducerSaslAuthTypeIT)
- Derive atMost timeout from BATCHING_INTERVAL_MS to prevent
deterministic timeout if the constant changes
(KafkaBatchingIntervalResetAfterIdleIT)
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../integration/KafkaProducerSaslAuthTypeIT.java | 67 ++++++++++++----------
.../kafka/integration/KafkaTransactionIT.java | 28 ++++-----
.../KafkaBatchingIntervalResetAfterIdleIT.java | 6 +-
3 files changed, 54 insertions(+), 47 deletions(-)
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
index d8d6fec75840..54eadecb3183 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
@@ -17,9 +17,13 @@
package org.apache.camel.component.kafka.integration;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
@@ -51,6 +55,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -153,7 +158,7 @@ public class KafkaProducerSaslAuthTypeIT {
@Timeout(30)
@Order(1)
@Test
- public void kafkaProducerWithSaslAuthType() throws InterruptedException {
+ public void kafkaProducerWithSaslAuthType() {
ProducerTemplate producerTemplate =
contextExtension.getProducerTemplate();
// Send messages using Camel producer with saslAuthType
@@ -161,34 +166,33 @@ public class KafkaProducerSaslAuthTypeIT {
producerTemplate.sendBodyAndHeader("direct:start", "test-message-"
+ i, KafkaConstants.KEY, "key-" + i);
}
- // Allow some time for messages to be sent
- Thread.sleep(2000);
-
// Consume messages with native Kafka consumer to verify they were
sent correctly
- int messageCount = 0;
- int maxAttempts = 10;
- int attempt = 0;
-
- while (messageCount < 5 && attempt < maxAttempts) {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> record : records) {
- LOG.info("Received message: key={}, value={}", record.key(),
record.value());
- assertNotNull(record.value());
- assertEquals("test-message-" + messageCount, record.value());
- assertEquals("key-" + messageCount, record.key());
- messageCount++;
- }
- attempt++;
+ List<ConsumerRecord<String, String>> received = new ArrayList<>();
+
+ await().atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ LOG.info("Received message: key={}, value={}",
record.key(), record.value());
+ received.add(record);
+ }
+ assertEquals(5, received.size(), "Should have received 5
messages");
+ });
+
+ // Sort by key to avoid relying on cross-partition delivery order
+ received.sort(Comparator.comparing(ConsumerRecord::key));
+ for (int i = 0; i < received.size(); i++) {
+ assertNotNull(received.get(i).value());
+ assertEquals("test-message-" + i, received.get(i).value());
+ assertEquals("key-" + i, received.get(i).key());
}
-
- assertEquals(5, messageCount, "Should have received 5 messages");
}
@DisplayName("Tests that Camel producer can send messages with headers
using saslAuthType")
@Timeout(30)
@Order(2)
@Test
- public void kafkaProducerWithSaslAuthTypeAndHeaders() throws
InterruptedException {
+ public void kafkaProducerWithSaslAuthTypeAndHeaders() {
ProducerTemplate producerTemplate =
contextExtension.getProducerTemplate();
// Send a message with custom headers
@@ -197,16 +201,19 @@ public class KafkaProducerSaslAuthTypeIT {
KafkaConstants.KEY, "header-test-key",
"CustomHeader", "custom-value"));
- // Allow some time for messages to be sent
- Thread.sleep(2000);
-
- // Consume the message
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(5000));
+ // Consume the message, waiting until it is available
+ List<ConsumerRecord<String, String>> received = new ArrayList<>();
- assertEquals(1, records.count(), "Should have received 1 message");
+ await().atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ received.add(record);
+ }
+ assertEquals(1, received.size(), "Should have received 1
message");
+ });
- ConsumerRecord<String, String> record = records.iterator().next();
- assertEquals("message-with-headers", record.value());
- assertEquals("header-test-key", record.key());
+ assertEquals("message-with-headers", received.get(0).value());
+ assertEquals("header-test-key", received.get(0).key());
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
index 01ab645173bd..a99638795424 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -191,24 +192,19 @@ public class KafkaTransactionIT extends
BaseKafkaTestSupport {
}
private void createKafkaMessageConsumer(
- KafkaConsumer<String, String> consumerConn, String topic,
CountDownLatch messagesLatch)
- throws InterruptedException {
+ KafkaConsumer<String, String> consumerConn, String topic,
CountDownLatch messagesLatch) {
consumerConn.subscribe(Arrays.asList(topic));
- boolean run = true;
- int numberOfAttempts = 0;
-
- while (run && numberOfAttempts < 100) {
- ConsumerRecords<String, String> records =
consumerConn.poll(Duration.ofMillis(100));
- for (int i = 0; i < records.count(); i++) {
- messagesLatch.countDown();
- if (messagesLatch.getCount() == 0) {
- run = false;
- }
- }
- numberOfAttempts++;
- Thread.sleep(100);
- }
+
+ await().atMost(20, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ ConsumerRecords<String, String> records =
consumerConn.poll(Duration.ofMillis(100));
+ for (int i = 0; i < records.count(); i++) {
+ messagesLatch.countDown();
+ }
+ assertEquals(0, messagesLatch.getCount(), "All messages
should have been consumed");
+ });
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
index 4a80fa327320..9b2d8f591cb2 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
@@ -125,7 +125,11 @@ public class KafkaBatchingIntervalResetAfterIdleIT extends
BatchingProcessingITS
// Idle for longer than batchingIntervalMs so that intervalWatch
expires while the
// queue is empty. This is the precondition that triggers the bug on
the next cycle.
- Thread.sleep(BATCHING_INTERVAL_MS + 600);
+ long idleDelayMs = BATCHING_INTERVAL_MS + 600;
+ await().pollDelay(idleDelayMs, TimeUnit.MILLISECONDS)
+ .atMost(idleDelayMs + 4000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(0, to.getReceivedCounter(),
+ "No new messages should arrive during idle period"));
// Reset the poll counter before starting the post-idle phase.
POST_IDLE_POLLS.set(0);