This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e3c615685a CAMEL-18253: prevent the code from reporting an invalid 
number of duplicates (#7964)
1e3c615685a is described below

commit 1e3c615685ab721d4c193b705ad6a1501245b264
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jul 1 12:39:33 2022 +0200

    CAMEL-18253: prevent the code from reporting an invalid number of 
duplicates (#7964)
    
    This removes the counter from the KafkaIdempotentRepository since 
duplicates are already correctly accounted for in the IdempotentConsumer class
---
 .../idempotent/kafka/KafkaIdempotentRepository.java       | 15 +--------------
 .../kafka/KafkaIdempotentRepositoryEagerIT.java           |  7 -------
 .../kafka/KafkaIdempotentRepositoryNonEagerIT.java        | 11 ++++-------
 .../kafka/KafkaIdempotentRepositoryPersistenceIT.java     | 14 +-------------
 4 files changed, 6 insertions(+), 41 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index e7f0d9d89a5..abb3b597697 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -80,8 +79,6 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-    private final AtomicLong duplicateCount = new AtomicLong();
-
     // configurable
     private String topic;
     private String bootstrapServers;
@@ -348,7 +345,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @Override
     public boolean add(String key) {
         if (cache.containsKey(key)) {
-            duplicateCount.incrementAndGet();
             return false;
         } else {
             // update the local cache and broadcast the addition on the topic,
@@ -374,11 +370,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
         log.debug("Checking cache for key:{}", key);
-        boolean containsKey = cache.containsKey(key);
-        if (containsKey) {
-            duplicateCount.incrementAndGet();
-        }
-        return containsKey;
+        return cache.containsKey(key);
     }
 
     @Override
@@ -402,11 +394,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         broadcastAction(null, CacheAction.clear);
     }
 
-    @ManagedOperation(description = "Number of times duplicate messages have 
been detected")
-    public long getDuplicateCount() {
-        return duplicateCount.get();
-    }
-
     @ManagedOperation(description = "Number of times duplicate messages have 
been detected")
     public boolean isPollerRunning() {
         return topicPoller.isRunning();
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index 853cea1ffd9..0b8104be695 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -66,8 +66,6 @@ public class KafkaIdempotentRepositoryEagerIT extends 
BaseEmbeddedKafkaTestSuppo
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
         assertEquals(5, mockOut.getReceivedCounter());
         assertEquals(10, mockBefore.getReceivedCounter());
     }
@@ -89,11 +87,6 @@ public class KafkaIdempotentRepositoryEagerIT extends 
BaseEmbeddedKafkaTestSuppo
             }
         }
 
-        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // 
id{0}
-                                                                       // is
-                                                                       // not a
-                                                                       // 
duplicate
-
         assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through 
the
                                                       // idempotency check
                                                       // twice
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index 07d08971032..cb2314c31be 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor.idempotent.kafka;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.EndpointInject;
@@ -27,6 +28,7 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -64,9 +66,9 @@ public class KafkaIdempotentRepositoryNonEagerIT extends 
BaseEmbeddedKafkaTestSu
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+        await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(5, 
mockOut.getReceivedCounter()));
 
-        assertEquals(5, mockOut.getReceivedCounter());
         assertEquals(10, mockBefore.getReceivedCounter());
     }
 
@@ -87,11 +89,6 @@ public class KafkaIdempotentRepositoryNonEagerIT extends 
BaseEmbeddedKafkaTestSu
             }
         }
 
-        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // 
id{0}
-                                                                       // is
-                                                                       // not a
-                                                                       // 
duplicate
-
         assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through 
the
                                                       // idempotency check
                                                       // twice
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 1a495cb7f01..58ef86ee7d1 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -96,9 +96,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(count, 
mockBefore.getReceivedCounter()));
 
-        // filters second attempt with same value
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
         // only first 5 records are received, the rest are filtered
         assertEquals(5, mockOut.getReceivedCounter());
     }
@@ -116,9 +113,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(count, 
mockBefore.getReceivedCounter()));
 
-        // the state from the previous test guarantees that all attempts now 
are blocked
-        assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
-
         // nothing pass the idempotent consumer this time
         assertEquals(0, mockOut.getReceivedCounter());
     }
@@ -138,9 +132,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(count * passes, 
mockBefore.getReceivedCounter()));
 
-        // the state from the previous test guarantees that all attempts now 
are blocked
-        assertEquals(count * passes, 
kafkaIdempotentRepository.getDuplicateCount());
-
         // nothing gets passed the idempotent consumer this time
         assertEquals(0, mockOut.getReceivedCounter());
     }
@@ -166,10 +157,7 @@ public class KafkaIdempotentRepositoryPersistenceIT 
extends BaseEmbeddedKafkaTes
         await().atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(count, 
mockBefore.getReceivedCounter()));
 
-        // there are no duplicate messages on this pass
-        assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());
-
-        // so all of them should pass
+        // there are no duplicate messages on this run so all of them should 
pass
         assertEquals(count, mockOut.getReceivedCounter());
     }
 

Reply via email to