This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new 651604e1322 CAMEL-18253: prevent the code from reporting an invalid
number of duplicates (#7964) (#7965)
651604e1322 is described below
commit 651604e13226956f45390745e35721b60ce75dac
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jul 1 13:32:13 2022 +0200
CAMEL-18253: prevent the code from reporting an invalid number of
duplicates (#7964) (#7965)
This removes the counter from the KafkaIdempotentRepository since
duplicates are already correctly accounted for in the IdempotentConsumer class
---
components/camel-kafka/pom.xml | 6 ++++++
.../idempotent/kafka/KafkaIdempotentRepository.java | 15 +--------------
.../kafka/KafkaIdempotentRepositoryEagerIT.java | 7 -------
.../kafka/KafkaIdempotentRepositoryNonEagerIT.java | 11 ++++-------
.../kafka/KafkaIdempotentRepositoryPersistenceIT.java | 14 +-------------
5 files changed, 12 insertions(+), 41 deletions(-)
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 47805554308..6aae92367b4 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -51,6 +51,12 @@
</dependency>
<!-- test -->
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-infra-common</artifactId>
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index e7f0d9d89a5..abb3b597697 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
@@ -80,8 +79,6 @@ public class KafkaIdempotentRepository extends ServiceSupport
implements Idempot
private final Logger log = LoggerFactory.getLogger(this.getClass());
- private final AtomicLong duplicateCount = new AtomicLong();
-
// configurable
private String topic;
private String bootstrapServers;
@@ -348,7 +345,6 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
@Override
public boolean add(String key) {
if (cache.containsKey(key)) {
- duplicateCount.incrementAndGet();
return false;
} else {
// update the local cache and broadcast the addition on the topic,
@@ -374,11 +370,7 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
@ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
log.debug("Checking cache for key:{}", key);
- boolean containsKey = cache.containsKey(key);
- if (containsKey) {
- duplicateCount.incrementAndGet();
- }
- return containsKey;
+ return cache.containsKey(key);
}
@Override
@@ -402,11 +394,6 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
broadcastAction(null, CacheAction.clear);
}
- @ManagedOperation(description = "Number of times duplicate messages have
been detected")
- public long getDuplicateCount() {
- return duplicateCount.get();
- }
-
@ManagedOperation(description = "Number of times duplicate messages have
been detected")
public boolean isPollerRunning() {
return topicPoller.isRunning();
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index c33ad1941f2..0890c1e556d 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -62,8 +62,6 @@ public class KafkaIdempotentRepositoryEagerIT extends
BaseEmbeddedKafkaTestSuppo
template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
assertEquals(5, mockOut.getReceivedCounter());
assertEquals(10, mockBefore.getReceivedCounter());
}
@@ -85,11 +83,6 @@ public class KafkaIdempotentRepositoryEagerIT extends
BaseEmbeddedKafkaTestSuppo
}
}
- assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); //
id{0}
- // is
- // not a
- //
duplicate
-
assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through
the
// idempotency check
// twice
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index ae1a75d24b4..5df811897dd 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.idempotent.kafka;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelExecutionException;
@@ -27,6 +28,7 @@ import
org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -64,9 +66,9 @@ public class KafkaIdempotentRepositoryNonEagerIT extends
BaseEmbeddedKafkaTestSu
template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(5,
mockOut.getReceivedCounter()));
- assertEquals(5, mockOut.getReceivedCounter());
assertEquals(10, mockBefore.getReceivedCounter());
}
@@ -87,11 +89,6 @@ public class KafkaIdempotentRepositoryNonEagerIT extends
BaseEmbeddedKafkaTestSu
}
}
- assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); //
id{0}
- // is
- // not a
- //
duplicate
-
assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through
the
// idempotency check
// twice
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 84caf8302fe..e57b187df80 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -91,9 +91,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
// all records sent initially
assertEquals(count, mockBefore.getReceivedCounter());
- // filters second attempt with same value
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
// only first 5 records are received, the rest are filtered
assertEquals(5, mockOut.getReceivedCounter());
}
@@ -110,9 +107,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
// all records sent initially
assertEquals(count, mockBefore.getReceivedCounter());
- // the state from the previous test guarantees that all attempts now
are blocked
- assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
-
// nothing pass the idempotent consumer this time
assertEquals(0, mockOut.getReceivedCounter());
}
@@ -131,9 +125,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
// all records sent initially
assertEquals(count * passes, mockBefore.getReceivedCounter());
- // the state from the previous test guarantees that all attempts now
are blocked
- assertEquals(count * passes,
kafkaIdempotentRepository.getDuplicateCount());
-
// nothing gets passed the idempotent consumer this time
assertEquals(0, mockOut.getReceivedCounter());
}
@@ -158,10 +149,7 @@ public class KafkaIdempotentRepositoryPersistenceIT
extends BaseEmbeddedKafkaTes
// all records sent initially
assertEquals(count, mockBefore.getReceivedCounter());
- // there are no duplicate messages on this pass
- assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());
-
- // so all of them should pass
+ // there are no duplicate messages on this run so all of them should
pass
assertEquals(count, mockOut.getReceivedCounter());
}