This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 40169000c7a96f4fcfb42193d49d5b3745738659 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Thu Apr 21 12:53:39 2022 +0200 (chores) camel-kafka: fix a few flaky tests --- .../KafkaConsumerLastRecordHeaderIT.java | 35 +++++++++++++++------- .../commit/KafkaConsumerNoopCommitIT.java | 2 +- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java index 5d30d5e0b71..68e098729d5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerLastRecordHeaderIT.java @@ -29,11 +29,14 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; public class KafkaConsumerLastRecordHeaderIT extends BaseEmbeddedKafkaTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerLastRecordHeaderIT.class); private static final String TOPIC = "last-record"; @EndpointInject("mock:result") @@ -58,7 +61,7 @@ public class KafkaConsumerLastRecordHeaderIT extends BaseEmbeddedKafkaTestSuppor /** * When consuming data with autoCommitEnable=false Then the LAST_RECORD_BEFORE_COMMIT header must be always defined - * And it should be true only for the last one + * And it should be true only for the last one of batch of polled records */ @Test public void shouldStartFromBeginningWithEmptyOffsetRepository() throws InterruptedException { @@ -69,17 +72,28 @@ public class KafkaConsumerLastRecordHeaderIT extends BaseEmbeddedKafkaTestSuppor producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i)); } - result.assertIsSatisfied(3000); + result.assertIsSatisfied(5000); List<Exchange> exchanges = result.getExchanges(); + LOG.debug("There are {} exchanges in the result", exchanges.size()); + for (int i = 0; i < exchanges.size(); i++) { - Boolean header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class); - assertNotNull(header, "Header not set for #" + i); - assertEquals(header, i == exchanges.size() - 1, "Header invalid for #" + i); - // as long as the partitions count is 1 on topic: - header = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.class); - assertNotNull(header, "Last record header not set for #" + i); - assertEquals(header, i == exchanges.size() - 1, "Last record header invalid for #" + i); + final Boolean lastRecordCommit + = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class); + final Boolean lastPollRecord = exchanges.get(i).getIn().getHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.class); + + LOG.debug("Processing LAST_RECORD_BEFORE_COMMIT header for {}: {} ", i, lastRecordCommit); + LOG.debug("Processing LAST_POLL_RECORD header for {}: {} ", i, lastPollRecord); + + assertNotNull(lastRecordCommit, "Header not set for #" + i); + assertEquals(lastRecordCommit, i == exchanges.size() - 1 || lastPollRecord.booleanValue(), + "Header invalid for #" + i); + + assertNotNull(lastPollRecord, "Last record header not set for #" + i); + + if (i == exchanges.size() - 1) { + assertEquals(lastPollRecord, i == exchanges.size() - 1, "Last record header invalid for #" + i); + } } } @@ -88,7 +102,8 @@ public class KafkaConsumerLastRecordHeaderIT extends BaseEmbeddedKafkaTestSuppor return new RouteBuilder() { @Override public void configure() { - from("kafka:" + TOPIC + "?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false").to("mock:result"); + from("kafka:" + TOPIC + "?groupId=A&autoOffsetReset=earliest&autoCommitEnable=false") + .to("mock:result"); } }; } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java index e309e266ce3..a1daccb40df 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java @@ -36,7 +36,7 @@ public class KafkaConsumerNoopCommitIT extends BaseManualCommitTestSupport { @EndpointInject("kafka:" + TOPIC + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" - + "&allowManualCommit=true&autoOffsetReset=earliest") + + "&allowManualCommit=true&autoOffsetReset=earliest&metadataMaxAgeMs=1000") private Endpoint from; @AfterEach
