gnodet commented on code in PR #24357:
URL: https://github.com/apache/camel/pull/24357#discussion_r3508399936


##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java:
##########
@@ -153,42 +157,39 @@ public void configure() {
     @Timeout(30)
     @Order(1)
     @Test
-    public void kafkaProducerWithSaslAuthType() throws InterruptedException {
+    public void kafkaProducerWithSaslAuthType() {
         ProducerTemplate producerTemplate = 
contextExtension.getProducerTemplate();
 
         // Send messages using Camel producer with saslAuthType
         for (int i = 0; i < 5; i++) {
             producerTemplate.sendBodyAndHeader("direct:start", "test-message-" 
+ i, KafkaConstants.KEY, "key-" + i);
         }
 
-        // Allow some time for messages to be sent
-        Thread.sleep(2000);
-
         // Consume messages with native Kafka consumer to verify they were 
sent correctly
-        int messageCount = 0;
-        int maxAttempts = 10;
-        int attempt = 0;
-
-        while (messageCount < 5 && attempt < maxAttempts) {
-            ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
-            for (ConsumerRecord<String, String> record : records) {
-                LOG.info("Received message: key={}, value={}", record.key(), 
record.value());
-                assertNotNull(record.value());
-                assertEquals("test-message-" + messageCount, record.value());
-                assertEquals("key-" + messageCount, record.key());
-                messageCount++;
-            }
-            attempt++;
+        List<ConsumerRecord<String, String>> received = new ArrayList<>();
+
+        await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+                    for (ConsumerRecord<String, String> record : records) {
+                        LOG.info("Received message: key={}, value={}", 
record.key(), record.value());
+                        received.add(record);
+                    }
+                    assertEquals(5, received.size(), "Should have received 5 
messages");
+                });
+
+        for (int i = 0; i < received.size(); i++) {
+            assertNotNull(received.get(i).value());
+            assertEquals("test-message-" + i, received.get(i).value());
+            assertEquals("key-" + i, received.get(i).key());
         }

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Good catch — fixed in 0fa883b. Records are now sorted by key before 
asserting, making the check order-independent across partitions.



##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java:
##########
@@ -125,7 +125,10 @@ public void 
intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce
 
         // Idle for longer than batchingIntervalMs so that intervalWatch 
expires while the
         // queue is empty. This is the precondition that triggers the bug on 
the next cycle.
-        Thread.sleep(BATCHING_INTERVAL_MS + 600);
+        await().pollDelay(BATCHING_INTERVAL_MS + 600, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(0, to.getReceivedCounter(),
+                        "No new messages should arrive during idle period"));

Review Comment:
   _Claude Code on behalf of Guillaume Nodet_
   
   Good point — fixed in 0fa883b. The `atMost` is now derived from 
`BATCHING_INTERVAL_MS` (`idleDelayMs + 4000`) so both stay coupled if the 
constant changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to