This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6a2aacc1479a8043280c405b5e6487ac69be5614 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Tue May 31 10:48:33 2022 +0200 (chores) camel-kafka: fix flaky manual async commit test --- .../KafkaConsumerAsyncManualCommitIT.java | 42 ++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java index f1e007482d5..bfda5a5a5e8 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java @@ -16,8 +16,9 @@ */ package org.apache.camel.component.kafka.integration; -import java.util.Collections; +import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.camel.BindToRegistry; import org.apache.camel.Endpoint; @@ -31,15 +32,23 @@ import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; import org.apache.camel.component.mock.MockEndpoint; import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport { public static final String TOPIC = "testManualCommitTest"; @@ -74,8 +83,6 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo if (producer != null) { producer.close(); } - // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } @Override @@ -111,12 +118,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo }; } - @RepeatedTest(4) - public void kafkaManualCommit() throws Exception { + @DisplayName("Tests that LAST_RECORD_BEFORE_COMMIT header includes a value") + @Order(1) + @Test + void testLastRecordBeforeCommitHeader() { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); - // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use - // manual commit + to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull(); for (int k = 0; k < 5; k++) { @@ -125,8 +133,16 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo producer.send(data); } - to.assertIsSatisfied(3000); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> to.assertIsSatisfied()); + + List<Exchange> exchangeList = to.getExchanges(); + assertEquals(5, exchangeList.size()); + assertEquals(true, exchangeList.get(4).getMessage().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class)); + } + @Order(2) + @Test + void kafkaManualCommit() throws Exception { to.reset(); // Second step: We shut down our route, we expect nothing will be recovered by our route @@ -141,16 +157,22 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo } to.assertIsSatisfied(3000); + } + @Order(3) + @Test + void testResumeFromTheRightPoint() throws Exception { to.reset(); // Fourth step: We start again our route, since we have been committing the offsets from the first step, // we will expect to consume from the latest committed offset e.g from offset 5 context.getRouteController().startRoute("foo"); + to.expectedMessageCount(3); to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7"); - to.assertIsSatisfied(3000); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> to.assertIsSatisfied()); assertEquals(0, failCount, "There should have been 0 commit failures"); }
