This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1e3c615685a CAMEL-18253: prevent the code from reporting an invalid
number of duplicates (#7964)
1e3c615685a is described below
commit 1e3c615685ab721d4c193b705ad6a1501245b264
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jul 1 12:39:33 2022 +0200
CAMEL-18253: prevent the code from reporting an invalid number of
duplicates (#7964)
This removes the counter from the KafkaIdempotentRepository since
duplicates are already correctly accounted for in the IdempotentConsumer class
---
.../idempotent/kafka/KafkaIdempotentRepository.java | 15 +--------------
.../kafka/KafkaIdempotentRepositoryEagerIT.java | 7 -------
.../kafka/KafkaIdempotentRepositoryNonEagerIT.java | 11 ++++-------
.../kafka/KafkaIdempotentRepositoryPersistenceIT.java | 14 +-------------
4 files changed, 6 insertions(+), 41 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index e7f0d9d89a5..abb3b597697 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
@@ -80,8 +79,6 @@ public class KafkaIdempotentRepository extends ServiceSupport
implements Idempot
private final Logger log = LoggerFactory.getLogger(this.getClass());
- private final AtomicLong duplicateCount = new AtomicLong();
-
// configurable
private String topic;
private String bootstrapServers;
@@ -348,7 +345,6 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
@Override
public boolean add(String key) {
if (cache.containsKey(key)) {
- duplicateCount.incrementAndGet();
return false;
} else {
// update the local cache and broadcast the addition on the topic,
@@ -374,11 +370,7 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
@ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
log.debug("Checking cache for key:{}", key);
- boolean containsKey = cache.containsKey(key);
- if (containsKey) {
- duplicateCount.incrementAndGet();
- }
- return containsKey;
+ return cache.containsKey(key);
}
@Override
@@ -402,11 +394,6 @@ public class KafkaIdempotentRepository extends
ServiceSupport implements Idempot
broadcastAction(null, CacheAction.clear);
}
- @ManagedOperation(description = "Number of times duplicate messages have
been detected")
- public long getDuplicateCount() {
- return duplicateCount.get();
- }
-
@ManagedOperation(description = "Number of times duplicate messages have
been detected")
public boolean isPollerRunning() {
return topicPoller.isRunning();
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index 853cea1ffd9..0b8104be695 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -66,8 +66,6 @@ public class KafkaIdempotentRepositoryEagerIT extends
BaseEmbeddedKafkaTestSuppo
template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
assertEquals(5, mockOut.getReceivedCounter());
assertEquals(10, mockBefore.getReceivedCounter());
}
@@ -89,11 +87,6 @@ public class KafkaIdempotentRepositoryEagerIT extends
BaseEmbeddedKafkaTestSuppo
}
}
- assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); //
id{0}
- // is
- // not a
- //
duplicate
-
assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through
the
// idempotency check
// twice
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index 07d08971032..cb2314c31be 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.idempotent.kafka;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
@@ -27,6 +28,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -64,9 +66,9 @@ public class KafkaIdempotentRepositoryNonEagerIT extends
BaseEmbeddedKafkaTestSu
template.sendBodyAndHeader("direct:in", "Test message", "id", i %
5);
}
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(5,
mockOut.getReceivedCounter()));
- assertEquals(5, mockOut.getReceivedCounter());
assertEquals(10, mockBefore.getReceivedCounter());
}
@@ -87,11 +89,6 @@ public class KafkaIdempotentRepositoryNonEagerIT extends
BaseEmbeddedKafkaTestSu
}
}
- assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); //
id{0}
- // is
- // not a
- //
duplicate
-
assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through
the
// idempotency check
// twice
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 1a495cb7f01..58ef86ee7d1 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -96,9 +96,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(count,
mockBefore.getReceivedCounter()));
- // filters second attempt with same value
- assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
// only first 5 records are received, the rest are filtered
assertEquals(5, mockOut.getReceivedCounter());
}
@@ -116,9 +113,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(count,
mockBefore.getReceivedCounter()));
- // the state from the previous test guarantees that all attempts now
are blocked
- assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
-
// nothing pass the idempotent consumer this time
assertEquals(0, mockOut.getReceivedCounter());
}
@@ -138,9 +132,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends
BaseEmbeddedKafkaTes
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(count * passes,
mockBefore.getReceivedCounter()));
- // the state from the previous test guarantees that all attempts now
are blocked
- assertEquals(count * passes,
kafkaIdempotentRepository.getDuplicateCount());
-
// nothing gets passed the idempotent consumer this time
assertEquals(0, mockOut.getReceivedCounter());
}
@@ -166,10 +157,7 @@ public class KafkaIdempotentRepositoryPersistenceIT
extends BaseEmbeddedKafkaTes
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(count,
mockBefore.getReceivedCounter()));
- // there are no duplicate messages on this pass
- assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());
-
- // so all of them should pass
+ // there are no duplicate messages on this run so all of them should
pass
assertEquals(count, mockOut.getReceivedCounter());
}