This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 0c124ff Fix KafkaIdempotentRepository flagging cache as ready
incorrectly (#5075)
0c124ff is described below
commit 0c124ff0881377311333fb25cc4a8d21d7f83a1d
Author: Javier Holguera <[email protected]>
AuthorDate: Thu Feb 11 16:32:51 2021 +0000
Fix KafkaIdempotentRepository flagging cache as ready incorrectly (#5075)
---
.../services/org/apache/camel/component.properties | 2 +-
.../org/apache/camel/component/kafka/kafka.json | 2 +-
.../kafka/KafkaIdempotentRepository.java | 27 ++++++++--
.../kafka/KafkaIdempotentRepositoryEagerTest.java | 6 ++-
.../KafkaIdempotentRepositoryNonEagerTest.java | 7 ++-
... KafkaIdempotentRepositoryPersistenceTest.java} | 58 +++++++++++-----------
.../src/generated/resources/metadata.json | 2 +-
7 files changed, 68 insertions(+), 36 deletions(-)
diff --git
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
index 95e9845..40484618 100644
---
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
+++
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -2,6 +2,6 @@
components=kafka
groupId=org.apache.camel
artifactId=camel-kafka
-version=3.7.2-SNAPSHOT
+version=3.7.3-SNAPSHOT
projectName=Camel :: Kafka
projectDescription=Camel Kafka support
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index bf991c6..8eec038 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -11,7 +11,7 @@
"supportLevel": "Stable",
"groupId": "org.apache.camel",
"artifactId": "camel-kafka",
- "version": "3.7.2-SNAPSHOT",
+ "version": "3.7.3-SNAPSHOT",
"scheme": "kafka",
"extendsScheme": "",
"syntax": "kafka:topic",
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 123a956..7675aeb 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.idempotent.kafka;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -39,6 +40,7 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
@@ -426,9 +429,27 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
@Override
public void run() {
log.debug("Subscribing consumer to {}", topic);
- consumer.subscribe(Collections.singleton(topic));
- log.debug("Seeking to beginning");
- consumer.seekToBeginning(consumer.assignment());
+ consumer.subscribe(Collections.singleton(topic), new
ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
collection) {
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
collection) {
+ // Whenever a partition is assigned, we want to consume
from the beginning to guarantee all the
+ // existing entries in the topic/partition are added to
the cache
+ log.debug("Seeking to beginning");
+ consumer.seekToBeginning(collection);
+ }
+ });
+
+ // According to the Kafka documentation: "Rebalances will only
occur during an active call to poll, so
+ // callbacks will also only be invoked during that time".
+ // We can safely trigger a poll(0) because the consumer doesn't
have any record pre-fetched.
+ log.debug("Forcing rebalance to get partitions assigned");
+ if (!consumer.poll(0).isEmpty()) {
+ throw new IllegalStateException("Firts call to Kafka
consumer.poll(0) should never return any record");
+ }
POLL_LOOP: while (running.get()) {
log.trace("Polling");
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
index 183b89d..480a428 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.idempotent.kafka;
+import java.util.UUID;
+
import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
@@ -31,9 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Test for eager idempotentRepository usage.
*/
public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest {
+
+ // Every instance of the repository must use a different topic to
guarantee isolation between tests
@BindToRegistry("kafkaIdempotentRepository")
private KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_IDEM",
getBootstrapServers());
+ = new KafkaIdempotentRepository("TEST_EAGER_" +
UUID.randomUUID().toString(), getBootstrapServers());
@EndpointInject("mock:out")
private MockEndpoint mockOut;
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
index 9761214..f86b40e 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.idempotent.kafka;
+import java.util.UUID;
+
import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
@@ -31,9 +33,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Test for non-eager idempotentRepository usage.
*/
public class KafkaIdempotentRepositoryNonEagerTest extends
BaseEmbeddedKafkaTest {
+
+ // Every instance of the repository must use a different topic to
guarantee isolation between tests
@BindToRegistry("kafkaIdempotentRepository")
private KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_IDEM",
getBootstrapServers());
+ = new KafkaIdempotentRepository(
+ "TEST_NON_EAGER_" + UUID.randomUUID().toString(),
getBootstrapServers());
@EndpointInject("mock:out")
private MockEndpoint mockOut;
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
similarity index 60%
copy from
components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
copy to
components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
index 183b89d..6577011 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceTest.java
@@ -17,23 +17,32 @@
package org.apache.camel.processor.idempotent.kafka;
import org.apache.camel.BindToRegistry;
-import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.BaseEmbeddedKafkaTest;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
- * Test for eager idempotentRepository usage.
+ * Test whether the KafkaIdempotentRepository successfully recreates its cache
from pre-existing topics. This guarantees
+ * that the de-duplication state survives application instance restarts.
+ *
+ * This test requires running in a certain order (which isn't great for unit
testing), hence the ordering-related
+ * annotations.
*/
-public class KafkaIdempotentRepositoryEagerTest extends BaseEmbeddedKafkaTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class KafkaIdempotentRepositoryPersistenceTest extends
BaseEmbeddedKafkaTest {
+
+ // Every instance of the repository must use a different topic to
guarantee isolation between tests
@BindToRegistry("kafkaIdempotentRepository")
private KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_IDEM",
getBootstrapServers());
+ = new KafkaIdempotentRepository("TEST_PERSISTENCE",
getBootstrapServers());
@EndpointInject("mock:out")
private MockEndpoint mockOut;
@@ -52,44 +61,37 @@ public class KafkaIdempotentRepositoryEagerTest extends
BaseEmbeddedKafkaTest {
};
}
+ @Order(1)
@Test
- public void testRemovesDuplicates() throws InterruptedException {
+ public void testFirstPassFiltersAsExpected() throws InterruptedException {
for (int i = 0; i < 10; i++) {
template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
+ // all records sent initially
+ assertEquals(10, mockBefore.getReceivedCounter());
+
+ // filters second attempt with same value
assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+ // only first 1-4 records are received, the rest are filtered
assertEquals(5, mockOut.getReceivedCounter());
- assertEquals(10, mockBefore.getReceivedCounter());
}
+ @Order(2)
@Test
- public void testRollsBackOnException() throws InterruptedException {
- mockOut.whenAnyExchangeReceived(exchange -> {
- int id = exchange.getIn().getHeader("id", Integer.class);
- if (id == 0) {
- throw new IllegalArgumentException("Boom!");
- }
- });
-
+ public void testSecondPassFiltersEverything() throws InterruptedException {
for (int i = 0; i < 10; i++) {
- try {
- template.sendBodyAndHeader("direct:in", "Test message", "id",
i % 5);
- } catch (CamelExecutionException cex) {
- // no-op; expected
- }
+ template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
- assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); //
id{0}
- // is
- // not a
- //
duplicate
-
- assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through
the
- // idempotency check
- // twice
+ // all records sent initially
assertEquals(10, mockBefore.getReceivedCounter());
- }
+ // the state from the previous test guarantees that all attempts now
are blocked
+ assertEquals(10, kafkaIdempotentRepository.getDuplicateCount());
+
+ // nothing gets passed the idempotent consumer this time
+ assertEquals(0, mockOut.getReceivedCounter());
+ }
}
diff --git a/core/camel-componentdsl/src/generated/resources/metadata.json
b/core/camel-componentdsl/src/generated/resources/metadata.json
index 832254b..05ac7f9 100644
--- a/core/camel-componentdsl/src/generated/resources/metadata.json
+++ b/core/camel-componentdsl/src/generated/resources/metadata.json
@@ -4491,7 +4491,7 @@
"supportLevel": "Stable",
"groupId": "org.apache.camel",
"artifactId": "camel-kafka",
- "version": "3.7.2-SNAPSHOT",
+ "version": "3.7.3-SNAPSHOT",
"scheme": "kafka",
"extendsScheme": "",
"syntax": "kafka:topic",