This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 40169000c7a96f4fcfb42193d49d5b3745738659
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Apr 21 12:53:39 2022 +0200

    (chores) camel-kafka: fix a few flaky tests
---
 .../KafkaConsumerLastRecordHeaderIT.java           | 35 +++++++++++++++-------
 .../commit/KafkaConsumerNoopCommitIT.java          |  2 +-
 2 files changed, 26 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
index 5d30d5e0b71..68e098729d5 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java
@@ -29,11 +29,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class KafkaConsumerLastRecordHeaderIT extends 
BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerLastRecordHeaderIT.class);
     private static final String TOPIC = "last-record";
 
     @EndpointInject("mock:result")
@@ -58,7 +61,7 @@ public class KafkaConsumerLastRecordHeaderIT extends 
BaseEmbeddedKafkaTestSuppor
 
     /**
      * When consuming data with autoCommitEnable=false Then the 
LAST_RECORD_BEFORE_COMMIT header must be always defined
-     * And it should be true only for the last one
+     * And it should be true only for the last one of batch of polled records
      */
     @Test
     public void shouldStartFromBeginningWithEmptyOffsetRepository() throws 
InterruptedException {
@@ -69,17 +72,28 @@ public class KafkaConsumerLastRecordHeaderIT extends 
BaseEmbeddedKafkaTestSuppor
             producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i));
         }
 
-        result.assertIsSatisfied(3000);
+        result.assertIsSatisfied(5000);
 
         List<Exchange> exchanges = result.getExchanges();
+        LOG.debug("There are {} exchanges in the result", exchanges.size());
+
         for (int i = 0; i < exchanges.size(); i++) {
-            Boolean header = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
Boolean.class);
-            assertNotNull(header, "Header not set for #" + i);
-            assertEquals(header, i == exchanges.size() - 1, "Header invalid 
for #" + i);
-            // as long as the partitions count is 1 on topic:
-            header = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, 
Boolean.class);
-            assertNotNull(header, "Last record header not set for #" + i);
-            assertEquals(header, i == exchanges.size() - 1, "Last record 
header invalid for #" + i);
+            final Boolean lastRecordCommit
+                    = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
Boolean.class);
+            final Boolean lastPollRecord = 
exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, 
Boolean.class);
+
+            LOG.debug("Processing LAST_RECORD_BEFORE_COMMIT header for {}: {} 
", i, lastRecordCommit);
+            LOG.debug("Processing LAST_POLL_RECORD header for {}: {} ", i, 
lastPollRecord);
+
+            assertNotNull(lastRecordCommit, "Header not set for #" + i);
+            assertEquals(lastRecordCommit, i == exchanges.size() - 1 || 
lastPollRecord.booleanValue(),
+                    "Header invalid for #" + i);
+
+            assertNotNull(lastPollRecord, "Last record header not set for #" + 
i);
+
+            if (i == exchanges.size() - 1) {
+                assertEquals(lastPollRecord, i == exchanges.size() - 1, "Last 
record header invalid for #" + i);
+            }
         }
     }
 
@@ -88,7 +102,8 @@ public class KafkaConsumerLastRecordHeaderIT extends 
BaseEmbeddedKafkaTestSuppor
         return new RouteBuilder() {
             @Override
             public void configure() {
-                from("kafka:" + TOPIC + 
"?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false").to("mock:result");
+                from("kafka:" + TOPIC + 
"?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false")
+                        .to("mock:result");
             }
         };
     }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
index e309e266ce3..a1daccb40df 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
@@ -36,7 +36,7 @@ public class KafkaConsumerNoopCommitIT extends 
BaseManualCommitTestSupport {
 
     @EndpointInject("kafka:" + TOPIC
                     + 
"?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
-                    + "&allowManualCommit=true&autoOffsetReset=earliest")
+                    + 
"&allowManualCommit=true&autoOffsetReset=earliest&metadataMaxAgeMs=1000")
     private Endpoint from;
 
     @AfterEach

Reply via email to