This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kafka-idem in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2bd91f9164bacf9328dde8f4ef49894d7e4bf42e Author: Claus Ibsen <[email protected]> AuthorDate: Sun Jul 7 09:48:53 2024 +0200 CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuous updates from its topic after startup --- .../catalog/beans/KafkaIdempotentRepository.json | 2 +- .../kafka/KafkaIdempotentRepositoryConfigurer.java | 12 ++ .../camel/bean/KafkaIdempotentRepository.json | 2 +- .../kafka/KafkaIdempotentRepository.java | 171 ++++++++++++++------- 4 files changed, 130 insertions(+), 57 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json index 78ec3900e64..de825e33c41 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json @@ -10,7 +10,7 @@ "groupId": "org.apache.camel", "artifactId": "camel-kafka", "version": "4.7.0-SNAPSHOT", - "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] + "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] } } diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java index 2ea11d6163d..d8bc437de27 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java @@ -25,10 +25,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": target.setBootstrapServers(property(camelContext, java.lang.String.class, value)); return true; + case "groupid": + case "groupId": target.setGroupId(property(camelContext, java.lang.String.class, value)); return true; case "maxcachesize": case "maxCacheSize": target.setMaxCacheSize(property(camelContext, int.class, value)); return true; case "polldurationms": case "pollDurationMs": target.setPollDurationMs(property(camelContext, int.class, value)); return true; + case "startuponly": + case "startupOnly": target.setStartupOnly(property(camelContext, boolean.class, value)); return true; case "topic": target.setTopic(property(camelContext, java.lang.String.class, value)); return true; default: return false; } @@ -39,10 +43,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": return java.lang.String.class; + case "groupid": + case "groupId": return java.lang.String.class; case "maxcachesize": case "maxCacheSize": return int.class; case "polldurationms": case "pollDurationMs": return int.class; + case "startuponly": + case "startupOnly": return boolean.class; case "topic": return java.lang.String.class; default: return null; } @@ -54,10 +62,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": return target.getBootstrapServers(); + case "groupid": + case "groupId": return target.getGroupId(); case "maxcachesize": case "maxCacheSize": return target.getMaxCacheSize(); case "polldurationms": case "pollDurationMs": return target.getPollDurationMs(); + case "startuponly": + case "startupOnly": return target.isStartupOnly(); case "topic": return target.getTopic(); default: return null; } diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json index 78ec3900e64..de825e33c41 100644 --- a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json @@ -10,7 +10,7 @@ "groupId": "org.apache.camel", "artifactId": "camel-kafka", "version": "4.7.0-SNAPSHOT", - "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] + "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index b5ce20da233..6a9e05db116 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -33,10 +35,13 @@ import org.apache.camel.spi.Configurer; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.Metadata; import org.apache.camel.support.LRUCacheFactory; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; +import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -63,8 +68,7 @@ import org.slf4j.LoggerFactory; * instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer * group, so in a cluster of 10 Camel processes using the same topic each will control its own offset. On startup, the * instance consumes the full content of the topic, rebuilding the cache to the latest state. To use, this repository - * must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is - * CamelContext aware. + * must be placed in the Camel registry. */ @Metadata(label = "bean", description = "Idempotent repository that uses Kafka to store message ids. Uses a local cache of previously seen Message IDs." @@ -81,6 +85,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private static final int DEFAULT_POLL_DURATION_MS = 100; private CamelContext camelContext; + private ExecutorService executorService; + private TopicPoller poller; + private final AtomicLong cacheCounter = new AtomicLong(); // internal properties private Map<String, Object> cache; private Consumer<String, String> consumer; @@ -88,7 +95,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private Properties producerConfig; private Properties consumerConfig; - private String groupId; // not in use // configurable @Metadata(description = "Sets the name of the Kafka topic used by this idempotent repository." @@ -97,6 +103,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private String topic; @Metadata(description = "The URL for the kafka brokers to use", required = true) private String bootstrapServers; + @Metadata(description = "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the" + + " same group id, multiple processes can indicate that they are all part of the same consumer group.") + private String groupId; @Metadata(description = "Sets the maximum size of the local key cache.", defaultValue = "" + DEFAULT_MAXIMUM_CACHE_SIZE) private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE; @Metadata(description = "Sets the poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect" @@ -110,6 +119,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot + " operate in an inconsistent state relative to its peers until it catches up.", defaultValue = "" + DEFAULT_POLL_DURATION_MS) private int pollDurationMs = DEFAULT_POLL_DURATION_MS; + @Metadata(description = "Whether to sync on startup only, or to continue syncing while Camel is running.") + private boolean startupOnly; enum CacheAction { add, @@ -124,10 +135,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) { this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); } @@ -143,9 +150,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ @Deprecated public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) { this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); @@ -160,10 +164,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) { this.topic = topic; @@ -173,10 +173,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.groupId = groupId; } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs, String groupId) { this.topic = topic; @@ -214,6 +210,17 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.bootstrapServers = bootstrapServers; } + public boolean isStartupOnly() { + return startupOnly; + } + + /** + * Whether to sync on startup only, or to continue syncing while Camel is running. + */ + public void setStartupOnly(boolean startupOnly) { + this.startupOnly = startupOnly; + } + public Properties getProducerConfig() { return producerConfig; } @@ -225,7 +232,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot * <pre> * bootstrap.servers * </pre> - * + * <p> * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need * non-standard configuration options such as SSL/SASL. * @@ -246,7 +253,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot * <pre> * bootstrap.servers * </pre> - * + * <p> * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need * non-standard configuration options such as SSL/SASL. * @@ -290,21 +297,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } - /** - * @deprecated The parameter groupId is ignored. - */ - @Deprecated public String getGroupId() { return groupId; } /** - * Sets the group id of the Kafka consumer. - * - * @param groupId The poll duration in milliseconds. - * @deprecated The parameter groupId is ignored. + * A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the + * same group id, multiple processes can indicate that they are all part of the same consumer group. */ - @Deprecated public void setGroupId(String groupId) { this.groupId = groupId; } @@ -330,6 +330,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot consumerConfig = new Properties(); StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + if (groupId != null) { + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } } if (producerConfig == null) { @@ -355,7 +358,34 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0"); producer = new KafkaProducer<>(producerConfig); - populateCache(); + poller = new TopicPoller(); + ServiceHelper.startService(poller); + // populate cache on startup to be ready + StopWatch watch = new StopWatch(); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} starting", topic); + poller.run(); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} complete: {}", topic, + TimeUtils.printDuration(watch.taken(), true)); + + if (!startupOnly) { + // continue sync job in background + executorService + = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepositorySync"); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} continuously using background thread", topic); + executorService.submit(poller); + } + } + + @Override + protected void doStop() throws Exception { + if (executorService != null && camelContext != null) { + camelContext.getExecutorServiceManager().shutdown(executorService); + executorService = null; + } + ServiceHelper.stopService(poller); + IOHelper.close(consumer, "consumer", LOG); + IOHelper.close(producer, "producer", LOG); + LOG.debug("Stopped KafkaIdempotentRepository. Cache counter: {}", cacheCounter.get()); } private void populateCache() { @@ -363,7 +393,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Collection<TopicPartition> partitions = partitionInfos.stream() .map(pi -> new TopicPartition(pi.topic(), pi.partition())) - .collect(Collectors.toUnmodifiableList()); + .toList(); LOG.debug("Assigning consumer to partitions {}", partitions); consumer.assign(partitions); @@ -379,38 +409,60 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot addToCache(consumerRecord); } } + } + private class TopicPoller extends ServiceSupport implements Runnable { + + private final AtomicBoolean init = new AtomicBoolean(); + + @Override + public void run() { + if (init.compareAndSet(false, true)) { + // sync cache on startup + LOG.debug("TopicPoller populating cache on startup"); + populateCache(); + LOG.debug("TopicPoller populated cache on startup complete"); + return; + } + + LOG.debug("TopicPoller running"); + while (isRunAllowed()) { + try { + ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs)); + for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { + addToCache(consumerRecord); + } + } catch (Exception e) { + LOG.warn("TopicPoller error syncing due to: " + e.getMessage() + ". This exception is ignored.", e); + } + } + LOG.debug("TopicPoller stopping"); + } } private void addToCache(ConsumerRecord<String, String> consumerRecord) { - CacheAction action = null; + cacheCounter.incrementAndGet(); + CacheAction action; try { action = CacheAction.valueOf(consumerRecord.value()); + String messageId = consumerRecord.key(); + if (action == CacheAction.add) { + LOG.debug("Adding to cache messageId:{}", messageId); + cache.put(messageId, messageId); + } else if (action == CacheAction.remove) { + LOG.debug("Removing from cache messageId:{}", messageId); + cache.remove(messageId); + } else if (action == CacheAction.clear) { + cache.clear(); + } else { + throw new IllegalArgumentException("Unknown action"); + } } catch (IllegalArgumentException iax) { - LOG.error( - "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.", + LOG.warn( + "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Ignoring.", consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()); } - String messageId = consumerRecord.key(); - if (action == CacheAction.add) { - LOG.debug("Adding to cache messageId:{}", messageId); - cache.put(messageId, messageId); - } else if (action == CacheAction.remove) { - LOG.debug("Removing from cache messageId:{}", messageId); - cache.remove(messageId); - } else if (action == CacheAction.clear) { - cache.clear(); - } else { - // this should never happen - throw new RuntimeException("Illegal action " + action + " for key " + consumerRecord.key()); - } - } - - @Override - protected void doStop() { - IOHelper.close(consumer, "consumer", LOG); - IOHelper.close(producer, "producer", LOG); } @Override @@ -444,7 +496,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot @Override @ManagedOperation(description = "Does the store contain the given key") public boolean contains(String key) { - LOG.debug("Checking cache for key:{}", key); return cache.containsKey(key); } @@ -468,4 +519,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot public void clear() { broadcastAction(null, CacheAction.clear); } + + @ManagedOperation(description = "Number of sync events received from the kafka topic") + public long getCacheCounter() { + return cacheCounter.get(); + } + + @ManagedOperation(description = "Number of elements currently in the cache") + public long getCacheSize() { + return cache != null ? cache.size() : 0; + } }
