Copilot commented on code in PR #24357:
URL: https://github.com/apache/camel/pull/24357#discussion_r3508265863


##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java:
##########
@@ -153,42 +157,39 @@ public void configure() {
     @Timeout(30)
     @Order(1)
     @Test
-    public void kafkaProducerWithSaslAuthType() throws InterruptedException {
+    public void kafkaProducerWithSaslAuthType() {
         ProducerTemplate producerTemplate = 
contextExtension.getProducerTemplate();
 
         // Send messages using Camel producer with saslAuthType
         for (int i = 0; i < 5; i++) {
             producerTemplate.sendBodyAndHeader("direct:start", "test-message-" 
+ i, KafkaConstants.KEY, "key-" + i);
         }
 
-        // Allow some time for messages to be sent
-        Thread.sleep(2000);
-
         // Consume messages with native Kafka consumer to verify they were 
sent correctly
-        int messageCount = 0;
-        int maxAttempts = 10;
-        int attempt = 0;
-
-        while (messageCount < 5 && attempt < maxAttempts) {
-            ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
-            for (ConsumerRecord<String, String> record : records) {
-                LOG.info("Received message: key={}, value={}", record.key(), 
record.value());
-                assertNotNull(record.value());
-                assertEquals("test-message-" + messageCount, record.value());
-                assertEquals("key-" + messageCount, record.key());
-                messageCount++;
-            }
-            attempt++;
+        List<ConsumerRecord<String, String>> received = new ArrayList<>();
+
+        await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+                    for (ConsumerRecord<String, String> record : records) {
+                        LOG.info("Received message: key={}, value={}", 
record.key(), record.value());
+                        received.add(record);
+                    }
+                    assertEquals(5, received.size(), "Should have received 5 
messages");
+                });
+
+        for (int i = 0; i < received.size(); i++) {
+            assertNotNull(received.get(i).value());
+            assertEquals("test-message-" + i, received.get(i).value());
+            assertEquals("key-" + i, received.get(i).key());
         }

Review Comment:
   The assertions assume the Kafka consumer delivers records in the same order 
they were produced (index-based checks). Record iteration order is not 
guaranteed (e.g., if multiple partitions are used), which can make this test 
flaky. Sorting (or validating by key) before asserting makes the check 
order-independent.



##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java:
##########
@@ -125,7 +125,10 @@ public void 
intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce
 
         // Idle for longer than batchingIntervalMs so that intervalWatch 
expires while the
         // queue is empty. This is the precondition that triggers the bug on 
the next cycle.
-        Thread.sleep(BATCHING_INTERVAL_MS + 600);
+        await().pollDelay(BATCHING_INTERVAL_MS + 600, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(0, to.getReceivedCounter(),
+                        "No new messages should arrive during idle period"));

Review Comment:
   The Awaitility timeout is a fixed 5 seconds, while the poll delay is derived 
from BATCHING_INTERVAL_MS. If BATCHING_INTERVAL_MS is adjusted in the future, 
this can accidentally make pollDelay >= atMost and cause a deterministic 
timeout. Consider deriving the atMost from the same constant to keep them 
consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to