This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 651604e1322 CAMEL-18253: prevent the code from reporting an invalid 
number of duplicates (#7964) (#7965)
651604e1322 is described below

commit 651604e13226956f45390745e35721b60ce75dac
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jul 1 13:32:13 2022 +0200

    CAMEL-18253: prevent the code from reporting an invalid number of 
duplicates (#7964) (#7965)
    
    This removes the counter from the KafkaIdempotentRepository since 
duplicates are already correctly accounted for in the IdempotentConsumer class
---
 components/camel-kafka/pom.xml                            |  6 ++++++
 .../idempotent/kafka/KafkaIdempotentRepository.java       | 15 +--------------
 .../kafka/KafkaIdempotentRepositoryEagerIT.java           |  7 -------
 .../kafka/KafkaIdempotentRepositoryNonEagerIT.java        | 11 ++++-------
 .../kafka/KafkaIdempotentRepositoryPersistenceIT.java     | 14 +-------------
 5 files changed, 12 insertions(+), 41 deletions(-)

diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 47805554308..6aae92367b4 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -51,6 +51,12 @@
         </dependency>
 
         <!-- test -->
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-infra-common</artifactId>
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index e7f0d9d89a5..abb3b597697 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -80,8 +79,6 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-    private final AtomicLong duplicateCount = new AtomicLong();
-
     // configurable
     private String topic;
     private String bootstrapServers;
@@ -348,7 +345,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @Override
     public boolean add(String key) {
         if (cache.containsKey(key)) {
-            duplicateCount.incrementAndGet();
             return false;
         } else {
             // update the local cache and broadcast the addition on the topic,
@@ -374,11 +370,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
         log.debug("Checking cache for key:{}", key);
-        boolean containsKey = cache.containsKey(key);
-        if (containsKey) {
-            duplicateCount.incrementAndGet();
-        }
-        return containsKey;
+        return cache.containsKey(key);
     }
 
     @Override
@@ -402,11 +394,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         broadcastAction(null, CacheAction.clear);
     }
 
-    @ManagedOperation(description = "Number of times duplicate messages have 
been detected")
-    public long getDuplicateCount() {
-        return duplicateCount.get();
-    }
-
     @ManagedOperation(description = "Number of times duplicate messages have 
been detected")
     public boolean isPollerRunning() {
         return topicPoller.isRunning();
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index c33ad1941f2..0890c1e556d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -62,8 +62,6 @@ public class KafkaIdempotentRepositoryEagerIT extends 
BaseEmbeddedKafkaTestSuppo
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
         assertEquals(5, mockOut.getReceivedCounter());
         assertEquals(10, mockBefore.getReceivedCounter());
     }
@@ -85,11 +83,6 @@ public class KafkaIdempotentRepositoryEagerIT extends 
BaseEmbeddedKafkaTestSuppo
             }
         }
 
-        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // 
id{0}
-                                                                       // is
-                                                                       // not a
-                                                                       // 
duplicate
-
         assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through 
the
                                                       // idempotency check
                                                       // twice
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index ae1a75d24b4..5df811897dd 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor.idempotent.kafka;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.CamelExecutionException;
@@ -27,6 +28,7 @@ import 
org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -64,9 +66,9 @@ public class KafkaIdempotentRepositoryNonEagerIT extends 
BaseEmbeddedKafkaTestSu
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
 
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+        await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(5, 
mockOut.getReceivedCounter()));
 
-        assertEquals(5, mockOut.getReceivedCounter());
         assertEquals(10, mockBefore.getReceivedCounter());
     }
 
@@ -87,11 +89,6 @@ public class KafkaIdempotentRepositoryNonEagerIT extends 
BaseEmbeddedKafkaTestSu
             }
         }
 
-        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // 
id{0}
-                                                                       // is
-                                                                       // not a
-                                                                       // 
duplicate
-
         assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through 
the
                                                       // idempotency check
                                                       // twice
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 84caf8302fe..e57b187df80 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -91,9 +91,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         // all records sent initially
         assertEquals(count, mockBefore.getReceivedCounter());
 
-        // filters second attempt with same value
-        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
-
         // only first 5 records are received, the rest are filtered
         assertEquals(5, mockOut.getReceivedCounter());
     }
@@ -110,9 +107,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         // all records sent initially
         assertEquals(count, mockBefore.getReceivedCounter());
 
-        // the state from the previous test guarantees that all attempts now 
are blocked
-        assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
-
         // nothing pass the idempotent consumer this time
         assertEquals(0, mockOut.getReceivedCounter());
     }
@@ -131,9 +125,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
         // all records sent initially
         assertEquals(count * passes, mockBefore.getReceivedCounter());
 
-        // the state from the previous test guarantees that all attempts now 
are blocked
-        assertEquals(count * passes, 
kafkaIdempotentRepository.getDuplicateCount());
-
         // nothing gets passed the idempotent consumer this time
         assertEquals(0, mockOut.getReceivedCounter());
     }
@@ -158,10 +149,7 @@ public class KafkaIdempotentRepositoryPersistenceIT 
extends BaseEmbeddedKafkaTes
         // all records sent initially
         assertEquals(count, mockBefore.getReceivedCounter());
 
-        // there are no duplicate messages on this pass
-        assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());
-
-        // so all of them should pass
+        // there are no duplicate messages on this run so all of them should 
pass
         assertEquals(count, mockOut.getReceivedCounter());
     }
 

Reply via email to