This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kafka-idem
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2bd91f9164bacf9328dde8f4ef49894d7e4bf42e
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jul 7 09:48:53 2024 +0200

    CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuous 
updates from its topic after startup
---
 .../catalog/beans/KafkaIdempotentRepository.json   |   2 +-
 .../kafka/KafkaIdempotentRepositoryConfigurer.java |  12 ++
 .../camel/bean/KafkaIdempotentRepository.json      |   2 +-
 .../kafka/KafkaIdempotentRepository.java           | 171 ++++++++++++++-------
 4 files changed, 130 insertions(+), 57 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
index 78ec3900e64..de825e33c41 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
@@ -10,7 +10,7 @@
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
     "version": "4.7.0-SNAPSHOT",
-    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
+    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
   }
 }
 
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
index 2ea11d6163d..d8bc437de27 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
@@ -25,10 +25,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": 
target.setBootstrapServers(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "groupid":
+        case "groupId": target.setGroupId(property(camelContext, 
java.lang.String.class, value)); return true;
         case "maxcachesize":
         case "maxCacheSize": target.setMaxCacheSize(property(camelContext, 
int.class, value)); return true;
         case "polldurationms":
         case "pollDurationMs": target.setPollDurationMs(property(camelContext, 
int.class, value)); return true;
+        case "startuponly":
+        case "startupOnly": target.setStartupOnly(property(camelContext, 
boolean.class, value)); return true;
         case "topic": target.setTopic(property(camelContext, 
java.lang.String.class, value)); return true;
         default: return false;
         }
@@ -39,10 +43,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": return java.lang.String.class;
+        case "groupid":
+        case "groupId": return java.lang.String.class;
         case "maxcachesize":
         case "maxCacheSize": return int.class;
         case "polldurationms":
         case "pollDurationMs": return int.class;
+        case "startuponly":
+        case "startupOnly": return boolean.class;
         case "topic": return java.lang.String.class;
         default: return null;
         }
@@ -54,10 +62,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": return target.getBootstrapServers();
+        case "groupid":
+        case "groupId": return target.getGroupId();
         case "maxcachesize":
         case "maxCacheSize": return target.getMaxCacheSize();
         case "polldurationms":
         case "pollDurationMs": return target.getPollDurationMs();
+        case "startuponly":
+        case "startupOnly": return target.isStartupOnly();
         case "topic": return target.getTopic();
         default: return null;
         }
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
index 78ec3900e64..de825e33c41 100644
--- 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
@@ -10,7 +10,7 @@
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
     "version": "4.7.0-SNAPSHOT",
-    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
+    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
   }
 }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index b5ce20da233..6a9e05db116 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -33,10 +35,13 @@ import org.apache.camel.spi.Configurer;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.support.LRUCacheFactory;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -63,8 +68,7 @@ import org.slf4j.LoggerFactory;
  * instance that uses the topic (e.g. typically on different machines running 
in parallel) controls its own consumer
  * group, so in a cluster of 10 Camel processes using the same topic each will 
control its own offset. On startup, the
  * instance consumes the full content of the topic, rebuilding the cache to 
the latest state. To use, this repository
- * must be placed in the Camel registry, either manually or by registration as 
a bean in Spring/Blueprint, as it is
- * CamelContext aware.
+ * must be placed in the Camel registry.
  */
 @Metadata(label = "bean",
           description = "Idempotent repository that uses Kafka to store 
message ids. Uses a local cache of previously seen Message IDs."
@@ -81,6 +85,9 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
     private static final int DEFAULT_POLL_DURATION_MS = 100;
 
     private CamelContext camelContext;
+    private ExecutorService executorService;
+    private TopicPoller poller;
+    private final AtomicLong cacheCounter = new AtomicLong();
     // internal properties
     private Map<String, Object> cache;
     private Consumer<String, String> consumer;
@@ -88,7 +95,6 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
 
     private Properties producerConfig;
     private Properties consumerConfig;
-    private String groupId; // not in use
 
     // configurable
     @Metadata(description = "Sets the name of the Kafka topic used by this 
idempotent repository."
@@ -97,6 +103,9 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     private String topic;
     @Metadata(description = "The URL for the kafka brokers to use", required = 
true)
     private String bootstrapServers;
+    @Metadata(description = "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the"
+                            + " same group id, multiple processes can indicate 
that they are all part of the same consumer group.")
+    private String groupId;
     @Metadata(description = "Sets the maximum size of the local key cache.", 
defaultValue = "" + DEFAULT_MAXIMUM_CACHE_SIZE)
     private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
     @Metadata(description = "Sets the poll duration of the Kafka consumer. The 
local caches are updated immediately; this value will affect"
@@ -110,6 +119,8 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
                             + " operate in an inconsistent state relative to 
its peers until it catches up.",
               defaultValue = "" + DEFAULT_POLL_DURATION_MS)
     private int pollDurationMs = DEFAULT_POLL_DURATION_MS;
+    @Metadata(description = "Whether to sync on startup only, or to continue 
syncing while Camel is running.")
+    private boolean startupOnly;
 
     enum CacheAction {
         add,
@@ -124,10 +135,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, 
DEFAULT_POLL_DURATION_MS);
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, 
String groupId) {
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, 
DEFAULT_POLL_DURATION_MS, groupId);
     }
@@ -143,9 +150,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this(topic, consumerConfig, producerConfig, 
DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
     @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, 
Properties producerConfig, String groupId) {
         this(topic, consumerConfig, producerConfig, 
DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
@@ -160,10 +164,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, 
int maxCacheSize, int pollDurationMs,
                                      String groupId) {
         this.topic = topic;
@@ -173,10 +173,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.groupId = groupId;
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, 
Properties producerConfig, int maxCacheSize,
                                      int pollDurationMs, String groupId) {
         this.topic = topic;
@@ -214,6 +210,17 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.bootstrapServers = bootstrapServers;
     }
 
+    public boolean isStartupOnly() {
+        return startupOnly;
+    }
+
+    /**
+     * Whether to sync on startup only, or to continue syncing while Camel is 
running.
+     */
+    public void setStartupOnly(boolean startupOnly) {
+        this.startupOnly = startupOnly;
+    }
+
     public Properties getProducerConfig() {
         return producerConfig;
     }
@@ -225,7 +232,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
      * <pre>
      * bootstrap.servers
      * </pre>
-     *
+     * <p>
      * property itself. Prefer using {@link #bootstrapServers} for default 
configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
      *
@@ -246,7 +253,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
      * <pre>
      * bootstrap.servers
      * </pre>
-     *
+     * <p>
      * property itself. Prefer using {@link #bootstrapServers} for default 
configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
      *
@@ -290,21 +297,14 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
-    /**
-     * @deprecated The parameter groupId is ignored.
-     */
-    @Deprecated
     public String getGroupId() {
         return groupId;
     }
 
     /**
-     * Sets the group id of the Kafka consumer.
-     *
-     * @param      groupId The poll duration in milliseconds.
-     * @deprecated         The parameter groupId is ignored.
+     * A string that uniquely identifies the group of consumer processes to 
which this consumer belongs. By setting the
+     * same group id, multiple processes can indicate that they are all part 
of the same consumer group.
      */
-    @Deprecated
     public void setGroupId(String groupId) {
         this.groupId = groupId;
     }
@@ -330,6 +330,9 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
             consumerConfig = new Properties();
             StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
             consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+            if (groupId != null) {
+                consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+            }
         }
 
         if (producerConfig == null) {
@@ -355,7 +358,34 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
         producer = new KafkaProducer<>(producerConfig);
 
-        populateCache();
+        poller = new TopicPoller();
+        ServiceHelper.startService(poller);
+        // populate cache on startup to be ready
+        StopWatch watch = new StopWatch();
+        LOG.info("Syncing KafkaIdempotentRepository from topic: {} starting", 
topic);
+        poller.run();
+        LOG.info("Syncing KafkaIdempotentRepository from topic: {} complete: 
{}", topic,
+                TimeUtils.printDuration(watch.taken(), true));
+
+        if (!startupOnly) {
+            // continue sync job in background
+            executorService
+                    = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
"KafkaIdempotentRepositorySync");
+            LOG.info("Syncing KafkaIdempotentRepository from topic: {} 
continuously using background thread", topic);
+            executorService.submit(poller);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (executorService != null && camelContext != null) {
+            camelContext.getExecutorServiceManager().shutdown(executorService);
+            executorService = null;
+        }
+        ServiceHelper.stopService(poller);
+        IOHelper.close(consumer, "consumer", LOG);
+        IOHelper.close(producer, "producer", LOG);
+        LOG.debug("Stopped KafkaIdempotentRepository. Cache counter: {}", 
cacheCounter.get());
     }
 
     private void populateCache() {
@@ -363,7 +393,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
         Collection<TopicPartition> partitions = partitionInfos.stream()
                 .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
-                .collect(Collectors.toUnmodifiableList());
+                .toList();
 
         LOG.debug("Assigning consumer to partitions {}", partitions);
         consumer.assign(partitions);
@@ -379,38 +409,60 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
                 addToCache(consumerRecord);
             }
         }
+    }
 
+    private class TopicPoller extends ServiceSupport implements Runnable {
+
+        private final AtomicBoolean init = new AtomicBoolean();
+
+        @Override
+        public void run() {
+            if (init.compareAndSet(false, true)) {
+                // sync cache on startup
+                LOG.debug("TopicPoller populating cache on startup");
+                populateCache();
+                LOG.debug("TopicPoller populated cache on startup complete");
+                return;
+            }
+
+            LOG.debug("TopicPoller running");
+            while (isRunAllowed()) {
+                try {
+                    ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofMillis(pollDurationMs));
+                    for (ConsumerRecord<String, String> consumerRecord : 
consumerRecords) {
+                        addToCache(consumerRecord);
+                    }
+                } catch (Exception e) {
+                    LOG.warn("TopicPoller error syncing due to: " + 
e.getMessage() + ". This exception is ignored.", e);
+                }
+            }
+            LOG.debug("TopicPoller stopping");
+        }
     }
 
     private void addToCache(ConsumerRecord<String, String> consumerRecord) {
-        CacheAction action = null;
+        cacheCounter.incrementAndGet();
+        CacheAction action;
         try {
             action = CacheAction.valueOf(consumerRecord.value());
+            String messageId = consumerRecord.key();
+            if (action == CacheAction.add) {
+                LOG.debug("Adding to cache messageId:{}", messageId);
+                cache.put(messageId, messageId);
+            } else if (action == CacheAction.remove) {
+                LOG.debug("Removing from cache messageId:{}", messageId);
+                cache.remove(messageId);
+            } else if (action == CacheAction.clear) {
+                cache.clear();
+            } else {
+                throw new IllegalArgumentException("Unknown action");
+            }
         } catch (IllegalArgumentException iax) {
-            LOG.error(
-                    "Unexpected action value:\"{}\" received on [topic:{}, 
partition:{}, offset:{}]. Shutting down.",
+            LOG.warn(
+                    "Unexpected action value:\"{}\" received on [topic:{}, 
partition:{}, offset:{}]. Ignoring.",
                     consumerRecord.key(), consumerRecord.topic(),
                     consumerRecord.partition(), consumerRecord.offset());
         }
-        String messageId = consumerRecord.key();
-        if (action == CacheAction.add) {
-            LOG.debug("Adding to cache messageId:{}", messageId);
-            cache.put(messageId, messageId);
-        } else if (action == CacheAction.remove) {
-            LOG.debug("Removing from cache messageId:{}", messageId);
-            cache.remove(messageId);
-        } else if (action == CacheAction.clear) {
-            cache.clear();
-        } else {
-            // this should never happen
-            throw new RuntimeException("Illegal action " + action + " for key 
" + consumerRecord.key());
-        }
-    }
-
-    @Override
-    protected void doStop() {
-        IOHelper.close(consumer, "consumer", LOG);
-        IOHelper.close(producer, "producer", LOG);
     }
 
     @Override
@@ -444,7 +496,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @Override
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
-        LOG.debug("Checking cache for key:{}", key);
         return cache.containsKey(key);
     }
 
@@ -468,4 +519,14 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     public void clear() {
         broadcastAction(null, CacheAction.clear);
     }
+
+    @ManagedOperation(description = "Number of sync events received from the 
kafka topic")
+    public long getCacheCounter() {
+        return cacheCounter.get();
+    }
+
+    @ManagedOperation(description = "Number of elements currently in the 
cache")
+    public long getCacheSize() {
+        return cache != null ? cache.size() : 0;
+    }
 }

Reply via email to