This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 564b0a4  CAMEL-16974: make the resume strategy configurable
564b0a4 is described below

commit 564b0a4e2887b8e52a859766aba950c2707d0e9a
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Sep 22 08:50:31 2021 +0200

    CAMEL-16974: make the resume strategy configurable
    
    As part of this, it also makes it possible to seek to specific offsets
    as requested in CAMEL-13768.
---
 .../component/kafka/KafkaComponentConfigurer.java  |   6 ++
 .../component/kafka/KafkaEndpointConfigurer.java   |   6 ++
 .../component/kafka/KafkaEndpointUriFactory.java   |   3 +-
 .../org/apache/camel/component/kafka/kafka.json    |   2 +
 .../camel/component/kafka/KafkaConfiguration.java  |  24 +++++
 .../consumer/support/OffsetResumeStrategy.java     |  10 +-
 .../support/PartitionAssignmentListener.java       |   9 +-
 .../kafka/consumer/support/ResumeStrategy.java     |  10 +-
 .../consumer/support/ResumeStrategyFactory.java    |  32 ++++--
 .../consumer/support/SeekPolicyResumeStrategy.java |   6 +-
 .../KafkaConsumerWithCustomResumeStrategyIT.java   | 110 +++++++++++++++++++++
 .../dsl/KafkaComponentBuilderFactory.java          |  25 +++++
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  48 +++++++++
 13 files changed, 264 insertions(+), 27 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index cb93742..53926fa 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -160,6 +160,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "requestRequiredAcks": 
getOrCreateConfiguration(target).setRequestRequiredAcks(property(camelContext, 
java.lang.String.class, value)); return true;
         case "requesttimeoutms":
         case "requestTimeoutMs": 
getOrCreateConfiguration(target).setRequestTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
+        case "resumestrategy":
+        case "resumeStrategy": 
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class, 
value)); return true;
         case "retries": 
getOrCreateConfiguration(target).setRetries(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "retrybackoffms":
         case "retryBackoffMs": 
getOrCreateConfiguration(target).setRetryBackoffMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
@@ -372,6 +374,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "requestRequiredAcks": return java.lang.String.class;
         case "requesttimeoutms":
         case "requestTimeoutMs": return java.lang.Integer.class;
+        case "resumestrategy":
+        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
         case "retries": return java.lang.Integer.class;
         case "retrybackoffms":
         case "retryBackoffMs": return java.lang.Integer.class;
@@ -580,6 +584,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "requestRequiredAcks": return 
getOrCreateConfiguration(target).getRequestRequiredAcks();
         case "requesttimeoutms":
         case "requestTimeoutMs": return 
getOrCreateConfiguration(target).getRequestTimeoutMs();
+        case "resumestrategy":
+        case "resumeStrategy": return 
getOrCreateConfiguration(target).getResumeStrategy();
         case "retries": return getOrCreateConfiguration(target).getRetries();
         case "retrybackoffms":
         case "retryBackoffMs": return 
getOrCreateConfiguration(target).getRetryBackoffMs();
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 1dc0e4f..467b99b 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -148,6 +148,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "requestRequiredAcks": 
target.getConfiguration().setRequestRequiredAcks(property(camelContext, 
java.lang.String.class, value)); return true;
         case "requesttimeoutms":
         case "requestTimeoutMs": 
target.getConfiguration().setRequestTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
+        case "resumestrategy":
+        case "resumeStrategy": 
target.getConfiguration().setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class, 
value)); return true;
         case "retries": 
target.getConfiguration().setRetries(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "retrybackoffms":
         case "retryBackoffMs": 
target.getConfiguration().setRetryBackoffMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
@@ -348,6 +350,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "requestRequiredAcks": return java.lang.String.class;
         case "requesttimeoutms":
         case "requestTimeoutMs": return java.lang.Integer.class;
+        case "resumestrategy":
+        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
         case "retries": return java.lang.Integer.class;
         case "retrybackoffms":
         case "retryBackoffMs": return java.lang.Integer.class;
@@ -549,6 +553,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "requestRequiredAcks": return 
target.getConfiguration().getRequestRequiredAcks();
         case "requesttimeoutms":
         case "requestTimeoutMs": return 
target.getConfiguration().getRequestTimeoutMs();
+        case "resumestrategy":
+        case "resumeStrategy": return 
target.getConfiguration().getResumeStrategy();
         case "retries": return target.getConfiguration().getRetries();
         case "retrybackoffms":
         case "retryBackoffMs": return 
target.getConfiguration().getRetryBackoffMs();
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 989d47d..c9ba787 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(100);
+        Set<String> props = new HashSet<>(101);
         props.add("synchronous");
         props.add("queueBufferingMaxMessages");
         props.add("allowManualCommit");
@@ -86,6 +86,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
         props.add("lazyStartProducer");
         props.add("sslKeystorePassword");
         props.add("sslEndpointAlgorithm");
+        props.add("resumeStrategy");
         props.add("topic");
         props.add("sslProtocol");
         props.add("sslKeymanagerAlgorithm");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index bd15d54..353403d 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -55,6 +55,7 @@
     "partitionAssignor": { "kind": "property", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignme [...]
     "pollOnError": { "kind": "property", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "de [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
+    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The  [...]
     "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning  [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
@@ -160,6 +161,7 @@
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignm [...]
     "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "d [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
+    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The [...]
     "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Plat [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index d4c4dd1..b357ed9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -147,6 +148,9 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     @UriParam(label = "consumer", defaultValue = "5000", javaType = 
"java.time.Duration")
     private Long commitTimeoutMs = 5000L;
 
+    @UriParam(label = "consumer")
+    private ResumeStrategy resumeStrategy;
+
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
     private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -797,6 +801,26 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.breakOnFirstError = breakOnFirstError;
     }
 
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    /**
+     * This option allows the user to set a custom resume strategy. The resume 
strategy is executed when partitions are
+     * assigned (i.e.: when connecting or reconnecting). It allows 
implementations to customize how to resume operations
+     * and serve as more flexible alternative to the seekTo and the 
offsetRepository mechanisms.
+     *
+     * See the {@link ResumeStrategy} for implementation details.
+     *
+     * This option does not affect the auto commit setting. It is likely that 
implementations using this setting will
+     * also want to evaluate using the manual commit option along with this.
+     *
+     * @param resumeStrategy An instance of the resume strategy
+     */
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
     public String getBrokers() {
         return brokers;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
index 6313337..9788f27 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
@@ -34,15 +34,13 @@ import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
 public class OffsetResumeStrategy implements ResumeStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(OffsetResumeStrategy.class);
 
-    private final KafkaConsumer<?, ?> consumer;
     private final StateRepository<String, String> offsetRepository;
 
-    public OffsetResumeStrategy(KafkaConsumer<?, ?> consumer, 
StateRepository<String, String> offsetRepository) {
-        this.consumer = consumer;
+    public OffsetResumeStrategy(StateRepository<String, String> 
offsetRepository) {
         this.offsetRepository = offsetRepository;
     }
 
-    private void resumeFromOffset(TopicPartition topicPartition, String 
offsetState) {
+    private void resumeFromOffset(final KafkaConsumer<?, ?> consumer, 
TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from 
the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
         LOG.debug("Resuming partition {} from offset {} from state", 
topicPartition.partition(), offset);
@@ -50,12 +48,12 @@ public class OffsetResumeStrategy implements ResumeStrategy 
{
     }
 
     @Override
-    public void resume() {
+    public void resume(final KafkaConsumer<?, ?> consumer) {
         Set<TopicPartition> assignments = consumer.assignment();
         for (TopicPartition topicPartition : assignments) {
             String offsetState = 
offsetRepository.getState(serializeOffsetKey(topicPartition));
             if (offsetState != null && !offsetState.isEmpty()) {
-                resumeFromOffset(topicPartition, offsetState);
+                resumeFromOffset(consumer, topicPartition, offsetState);
             }
         }
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index ce02e37..e07db10 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
@@ -52,11 +51,7 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
         this.lastProcessedOffset = lastProcessedOffset;
         this.stopStateSupplier = stopStateSupplier;
 
-        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
-        String seekPolicy = configuration.getSeekTo();
-
-        LOG.info("Performing resume as {} ", seekPolicy);
-        resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer, 
offsetRepository, seekPolicy);
+        resumeStrategy = 
ResumeStrategyFactory.newResumeStrategy(configuration);
     }
 
     @Override
@@ -91,6 +86,6 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
         LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
 
-        resumeStrategy.resume();
+        resumeStrategy.resume(consumer);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
index e9b6511..ebe7ed6 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
@@ -17,13 +17,19 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
 /**
  * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
  * processing records.
  */
 public interface ResumeStrategy {
     /**
-     * Perform the resume operation
+     * Perform the resume operation. This runs in the scope of the Kafka 
Consumer thread and may run concurrently with
+     * other consumer instances when the component is set up to use more than 
one of them. As such, implementations are
+     * responsible for ensuring the thread-safety of the operations within the 
resume method.
+     *
+     * @param consumer an instance of the KafkaConsumer which is resuming the 
operation
      */
-    void resume();
+    void resume(KafkaConsumer<?, ?> consumer);
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 72fece7..44bd588 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -17,34 +17,52 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
+import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class ResumeStrategyFactory {
     /**
      * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
     private static class NoOpResumeStrategy implements ResumeStrategy {
+        @SuppressWarnings("unused")
         @Override
-        public void resume() {
-
+        public void resume(KafkaConsumer<?, ?> consumer) {
+            // NO-OP
         }
     }
 
     private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new 
NoOpResumeStrategy();
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumeStrategyFactory.class);
 
     private ResumeStrategyFactory() {
     }
 
-    public static ResumeStrategy newResumeStrategy(
-            KafkaConsumer<?, ?> consumer, StateRepository<String, String> 
offsetRepository,
-            String seekTo) {
+    public static ResumeStrategy newResumeStrategy(KafkaConfiguration 
configuration) {
+
+        if (configuration.getResumeStrategy() != null) {
+            return configuration.getResumeStrategy();
+        }
+
+        return builtinResumeStrategies(configuration);
+    }
+
+    private static ResumeStrategy builtinResumeStrategies(KafkaConfiguration 
configuration) {
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+        String seekTo = configuration.getSeekTo();
+
         if (offsetRepository != null) {
-            return new OffsetResumeStrategy(consumer, offsetRepository);
+            LOG.info("Using resume from offset strategy");
+            return new OffsetResumeStrategy(offsetRepository);
         } else if (seekTo != null) {
-            return new SeekPolicyResumeStrategy(consumer, seekTo);
+            LOG.info("Using resume from seek policy strategy with seeking from 
{}", seekTo);
+            return new SeekPolicyResumeStrategy(seekTo);
         }
 
+        LOG.info("Using NO-OP resume strategy");
         return NO_OP_RESUME_STRATEGY;
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
index d7606df..d58a23a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
@@ -28,15 +28,13 @@ public class SeekPolicyResumeStrategy implements 
ResumeStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(SeekPolicyResumeStrategy.class);
 
     private final String seekPolicy;
-    private final KafkaConsumer<?, ?> consumer;
 
-    public SeekPolicyResumeStrategy(KafkaConsumer<?, ?> consumer, String 
seekPolicy) {
+    public SeekPolicyResumeStrategy(String seekPolicy) {
         this.seekPolicy = seekPolicy;
-        this.consumer = consumer;
     }
 
     @Override
-    public void resume() {
+    public void resume(final KafkaConsumer<?, ?> consumer) {
         if (seekPolicy.equals("beginning")) {
             LOG.debug("Seeking from the beginning of topic");
             consumer.seekToBeginning(consumer.assignment());
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
new file mode 100644
index 0000000..21040d0
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaConsumerWithCustomResumeStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
+    private static final String TOPIC = "custom-resume";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @BindToRegistry("resumeStrategy")
+    private TestResumeStrategy resumeStrategy;
+    private CountDownLatch messagesLatch;
+
+    private static class TestResumeStrategy implements ResumeStrategy {
+        private final CountDownLatch messagesLatch;
+        private boolean resumeCalled;
+        private boolean consumerIsNull = true;
+
+        public TestResumeStrategy(CountDownLatch messagesLatch) {
+            this.messagesLatch = messagesLatch;
+        }
+
+        @Override
+        public void resume(KafkaConsumer<?, ?> consumer) {
+            resumeCalled = true;
+
+            if (consumer != null) {
+                consumerIsNull = false;
+            }
+
+            messagesLatch.countDown();
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+
+        public boolean isConsumerIsNull() {
+            return consumerIsNull;
+        }
+    }
+
+    @Override
+    protected void doPreSetup() {
+        messagesLatch = new CountDownLatch(1);
+        resumeStrategy = new TestResumeStrategy(messagesLatch);
+    }
+
+    @Test
+    @Timeout(value = 30)
+    public void offsetGetStateMustHaveBeenCalledTwice() throws 
InterruptedException {
+        assertTrue(messagesLatch.await(4, TimeUnit.SECONDS), "The resume was 
not called");
+
+        assertTrue(resumeStrategy.isResumeCalled(),
+                "The resume strategy should have been called when the 
partition was assigned");
+        assertFalse(resumeStrategy.isConsumerIsNull(),
+                "The consumer passed to the strategy should not be null");
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("kafka:" + TOPIC + "?groupId=" + TOPIC + 
"_GROUP&autoCommitIntervalMs=1000"
+                     + "&autoOffsetReset=latest" + "&consumersCount=1"
+                     + "&resumeStrategy=#resumeStrategy")
+                             .routeId("resume-strategy-route")
+                             .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 182a3d6..941ee86 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -674,6 +674,30 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * This option allows the user to set a custom resume strategy. The
+         * resume strategy is executed when partitions are assigned (i.e.: when
+         * connecting or reconnecting). It allows implementations to customize
+         * how to resume operations and serve as more flexible alternative to
+         * the seekTo and the offsetRepository mechanisms. See the
+         * ResumeStrategy for implementation details. This option does not
+         * affect the auto commit setting. It is likely that implementations
+         * using this setting will also want to evaluate using the manual 
commit
+         * option along with this.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder resumeStrategy(
+                
org.apache.camel.component.kafka.consumer.support.ResumeStrategy 
resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
          * Set if KafkaConsumer will read from beginning or end on startup:
          * beginning : read from beginning end : read from end This is 
replacing
          * the earlier property seekToBeginning.
@@ -2017,6 +2041,7 @@ public interface KafkaComponentBuilderFactory {
             case "partitionAssignor": 
getOrCreateConfiguration((KafkaComponent) 
component).setPartitionAssignor((java.lang.String) value); return true;
             case "pollOnError": getOrCreateConfiguration((KafkaComponent) 
component).setPollOnError((org.apache.camel.component.kafka.PollOnError) 
value); return true;
             case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setPollTimeoutMs((java.lang.Long) value); return true;
+            case "resumeStrategy": getOrCreateConfiguration((KafkaComponent) 
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.ResumeStrategy)
 value); return true;
             case "seekTo": getOrCreateConfiguration((KafkaComponent) 
component).setSeekTo((java.lang.String) value); return true;
             case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
             case "specificAvroReader": 
getOrCreateConfiguration((KafkaComponent) 
component).setSpecificAvroReader((boolean) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 21066fd..3eaddbd 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1137,6 +1137,54 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * This option allows the user to set a custom resume strategy. The
+         * resume strategy is executed when partitions are assigned (i.e.: when
+         * connecting or reconnecting). It allows implementations to customize
+         * how to resume operations and serve as more flexible alternative to
+         * the seekTo and the offsetRepository mechanisms. See the
+         * ResumeStrategy for implementation details. This option does not
+         * affect the auto commit setting. It is likely that implementations
+         * using this setting will also want to evaluate using the manual 
commit
+         * option along with this.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder resumeStrategy(
+                Object resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
+         * This option allows the user to set a custom resume strategy. The
+         * resume strategy is executed when partitions are assigned (i.e.: when
+         * connecting or reconnecting). It allows implementations to customize
+         * how to resume operations and serve as more flexible alternative to
+         * the seekTo and the offsetRepository mechanisms. See the
+         * ResumeStrategy for implementation details. This option does not
+         * affect the auto commit setting. It is likely that implementations
+         * using this setting will also want to evaluate using the manual 
commit
+         * option along with this.
+         * 
+         * The option will be converted to a
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder resumeStrategy(
+                String resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
          * Set if KafkaConsumer will read from beginning or end on startup:
          * beginning : read from beginning end : read from end This is 
replacing
          * the earlier property seekToBeginning.

Reply via email to