This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dfdbe9befc3 Replace Thread.sleep with Awaitility in camel-kafka tests 
(#24357)
5dfdbe9befc3 is described below

commit 5dfdbe9befc3584e597f91e96be72fa7c2ac72f3
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Jul 2 07:28:48 2026 +0200

    Replace Thread.sleep with Awaitility in camel-kafka tests (#24357)
    
    * Replace Thread.sleep with Awaitility in camel-kafka tests
    
    Replace Thread.sleep() calls with Awaitility-based polling in three
    camel-kafka test files to improve test reliability and reduce flakiness:
    
    - KafkaProducerSaslAuthTypeIT: Replace sleep-then-poll with
      await().untilAsserted() for message consumption verification
    - KafkaTransactionIT: Replace manual polling loop with sleep to
      await().pollInterval().untilAsserted()
    - KafkaBatchingIntervalResetAfterIdleIT: Replace Thread.sleep for idle
      period simulation with await().pollDelay().untilAsserted()
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Address review feedback: order-independent assertions and coupled timeouts
    
    - Sort received records by key before asserting to avoid relying on
      cross-partition delivery order (KafkaProducerSaslAuthTypeIT)
    - Derive atMost timeout from BATCHING_INTERVAL_MS to prevent
      deterministic timeout if the constant changes
      (KafkaBatchingIntervalResetAfterIdleIT)
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../integration/KafkaProducerSaslAuthTypeIT.java   | 67 ++++++++++++----------
 .../kafka/integration/KafkaTransactionIT.java      | 28 ++++-----
 .../KafkaBatchingIntervalResetAfterIdleIT.java     |  6 +-
 3 files changed, 54 insertions(+), 47 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
index d8d6fec75840..54eadecb3183 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java
@@ -17,9 +17,13 @@
 package org.apache.camel.component.kafka.integration;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
@@ -51,6 +55,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -153,7 +158,7 @@ public class KafkaProducerSaslAuthTypeIT {
     @Timeout(30)
     @Order(1)
     @Test
-    public void kafkaProducerWithSaslAuthType() throws InterruptedException {
+    public void kafkaProducerWithSaslAuthType() {
         ProducerTemplate producerTemplate = 
contextExtension.getProducerTemplate();
 
         // Send messages using Camel producer with saslAuthType
@@ -161,34 +166,33 @@ public class KafkaProducerSaslAuthTypeIT {
             producerTemplate.sendBodyAndHeader("direct:start", "test-message-" 
+ i, KafkaConstants.KEY, "key-" + i);
         }
 
-        // Allow some time for messages to be sent
-        Thread.sleep(2000);
-
         // Consume messages with native Kafka consumer to verify they were 
sent correctly
-        int messageCount = 0;
-        int maxAttempts = 10;
-        int attempt = 0;
-
-        while (messageCount < 5 && attempt < maxAttempts) {
-            ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
-            for (ConsumerRecord<String, String> record : records) {
-                LOG.info("Received message: key={}, value={}", record.key(), 
record.value());
-                assertNotNull(record.value());
-                assertEquals("test-message-" + messageCount, record.value());
-                assertEquals("key-" + messageCount, record.key());
-                messageCount++;
-            }
-            attempt++;
+        List<ConsumerRecord<String, String>> received = new ArrayList<>();
+
+        await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+                    for (ConsumerRecord<String, String> record : records) {
+                        LOG.info("Received message: key={}, value={}", 
record.key(), record.value());
+                        received.add(record);
+                    }
+                    assertEquals(5, received.size(), "Should have received 5 
messages");
+                });
+
+        // Sort by key to avoid relying on cross-partition delivery order
+        received.sort(Comparator.comparing(ConsumerRecord::key));
+        for (int i = 0; i < received.size(); i++) {
+            assertNotNull(received.get(i).value());
+            assertEquals("test-message-" + i, received.get(i).value());
+            assertEquals("key-" + i, received.get(i).key());
         }
-
-        assertEquals(5, messageCount, "Should have received 5 messages");
     }
 
     @DisplayName("Tests that Camel producer can send messages with headers 
using saslAuthType")
     @Timeout(30)
     @Order(2)
     @Test
-    public void kafkaProducerWithSaslAuthTypeAndHeaders() throws 
InterruptedException {
+    public void kafkaProducerWithSaslAuthTypeAndHeaders() {
         ProducerTemplate producerTemplate = 
contextExtension.getProducerTemplate();
 
         // Send a message with custom headers
@@ -197,16 +201,19 @@ public class KafkaProducerSaslAuthTypeIT {
                         KafkaConstants.KEY, "header-test-key",
                         "CustomHeader", "custom-value"));
 
-        // Allow some time for messages to be sent
-        Thread.sleep(2000);
-
-        // Consume the message
-        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(5000));
+        // Consume the message, waiting until it is available
+        List<ConsumerRecord<String, String>> received = new ArrayList<>();
 
-        assertEquals(1, records.count(), "Should have received 1 message");
+        await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+                    for (ConsumerRecord<String, String> record : records) {
+                        received.add(record);
+                    }
+                    assertEquals(1, received.size(), "Should have received 1 
message");
+                });
 
-        ConsumerRecord<String, String> record = records.iterator().next();
-        assertEquals("message-with-headers", record.value());
-        assertEquals("header-test-key", record.key());
+        assertEquals("message-with-headers", received.get(0).value());
+        assertEquals("header-test-key", received.get(0).key());
     }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
index 01ab645173bd..a99638795424 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaTransactionIT.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -191,24 +192,19 @@ public class KafkaTransactionIT extends 
BaseKafkaTestSupport {
     }
 
     private void createKafkaMessageConsumer(
-            KafkaConsumer<String, String> consumerConn, String topic, 
CountDownLatch messagesLatch)
-            throws InterruptedException {
+            KafkaConsumer<String, String> consumerConn, String topic, 
CountDownLatch messagesLatch) {
 
         consumerConn.subscribe(Arrays.asList(topic));
-        boolean run = true;
-        int numberOfAttempts = 0;
-
-        while (run && numberOfAttempts < 100) {
-            ConsumerRecords<String, String> records = 
consumerConn.poll(Duration.ofMillis(100));
-            for (int i = 0; i < records.count(); i++) {
-                messagesLatch.countDown();
-                if (messagesLatch.getCount() == 0) {
-                    run = false;
-                }
-            }
-            numberOfAttempts++;
-            Thread.sleep(100);
-        }
+
+        await().atMost(20, TimeUnit.SECONDS)
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    ConsumerRecords<String, String> records = 
consumerConn.poll(Duration.ofMillis(100));
+                    for (int i = 0; i < records.count(); i++) {
+                        messagesLatch.countDown();
+                    }
+                    assertEquals(0, messagesLatch.getCount(), "All messages 
should have been consumed");
+                });
     }
 
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
index 4a80fa327320..9b2d8f591cb2 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java
@@ -125,7 +125,11 @@ public class KafkaBatchingIntervalResetAfterIdleIT extends 
BatchingProcessingITS
 
         // Idle for longer than batchingIntervalMs so that intervalWatch 
expires while the
         // queue is empty. This is the precondition that triggers the bug on 
the next cycle.
-        Thread.sleep(BATCHING_INTERVAL_MS + 600);
+        long idleDelayMs = BATCHING_INTERVAL_MS + 600;
+        await().pollDelay(idleDelayMs, TimeUnit.MILLISECONDS)
+                .atMost(idleDelayMs + 4000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> assertEquals(0, to.getReceivedCounter(),
+                        "No new messages should arrive during idle period"));
 
         // Reset the poll counter before starting the post-idle phase.
         POST_IDLE_POLLS.set(0);

Reply via email to