gnodet commented on code in PR #24357:
URL: https://github.com/apache/camel/pull/24357#discussion_r3508399936
##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerSaslAuthTypeIT.java:
##########
@@ -153,42 +157,39 @@ public void configure() {
@Timeout(30)
@Order(1)
@Test
- public void kafkaProducerWithSaslAuthType() throws InterruptedException {
+ public void kafkaProducerWithSaslAuthType() {
ProducerTemplate producerTemplate =
contextExtension.getProducerTemplate();
// Send messages using Camel producer with saslAuthType
for (int i = 0; i < 5; i++) {
producerTemplate.sendBodyAndHeader("direct:start", "test-message-"
+ i, KafkaConstants.KEY, "key-" + i);
}
- // Allow some time for messages to be sent
- Thread.sleep(2000);
-
// Consume messages with native Kafka consumer to verify they were
sent correctly
- int messageCount = 0;
- int maxAttempts = 10;
- int attempt = 0;
-
- while (messageCount < 5 && attempt < maxAttempts) {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> record : records) {
- LOG.info("Received message: key={}, value={}", record.key(),
record.value());
- assertNotNull(record.value());
- assertEquals("test-message-" + messageCount, record.value());
- assertEquals("key-" + messageCount, record.key());
- messageCount++;
- }
- attempt++;
+ List<ConsumerRecord<String, String>> received = new ArrayList<>();
+
+ await().atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ LOG.info("Received message: key={}, value={}",
record.key(), record.value());
+ received.add(record);
+ }
+ assertEquals(5, received.size(), "Should have received 5
messages");
+ });
+
+ for (int i = 0; i < received.size(); i++) {
+ assertNotNull(received.get(i).value());
+ assertEquals("test-message-" + i, received.get(i).value());
+ assertEquals("key-" + i, received.get(i).key());
}
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Good catch — fixed in 0fa883b. Records are now sorted by key before
asserting, making the check order-independent across partitions.
##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java:
##########
@@ -125,7 +125,10 @@ public void
intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce
// Idle for longer than batchingIntervalMs so that intervalWatch
expires while the
// queue is empty. This is the precondition that triggers the bug on
the next cycle.
- Thread.sleep(BATCHING_INTERVAL_MS + 600);
+ await().pollDelay(BATCHING_INTERVAL_MS + 600, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(0, to.getReceivedCounter(),
+ "No new messages should arrive during idle period"));
Review Comment:
_Claude Code on behalf of Guillaume Nodet_
Good point — fixed in 0fa883b. The `atMost` is now derived from
`BATCHING_INTERVAL_MS` (`idleDelayMs + 4000`) so both stay coupled if the
constant changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]