This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 564b0a4 CAMEL-16974: make the resume strategy configurable
564b0a4 is described below
commit 564b0a4e2887b8e52a859766aba950c2707d0e9a
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Sep 22 08:50:31 2021 +0200
CAMEL-16974: make the resume strategy configurable
As part of this, it also makes it possible to seek to specific offsets
as requested in CAMEL-13768.
---
.../component/kafka/KafkaComponentConfigurer.java | 6 ++
.../component/kafka/KafkaEndpointConfigurer.java | 6 ++
.../component/kafka/KafkaEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/kafka/kafka.json | 2 +
.../camel/component/kafka/KafkaConfiguration.java | 24 +++++
.../consumer/support/OffsetResumeStrategy.java | 10 +-
.../support/PartitionAssignmentListener.java | 9 +-
.../kafka/consumer/support/ResumeStrategy.java | 10 +-
.../consumer/support/ResumeStrategyFactory.java | 32 ++++--
.../consumer/support/SeekPolicyResumeStrategy.java | 6 +-
.../KafkaConsumerWithCustomResumeStrategyIT.java | 110 +++++++++++++++++++++
.../dsl/KafkaComponentBuilderFactory.java | 25 +++++
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 48 +++++++++
13 files changed, 264 insertions(+), 27 deletions(-)
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index cb93742..53926fa 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -160,6 +160,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "requestRequiredAcks":
getOrCreateConfiguration(target).setRequestRequiredAcks(property(camelContext,
java.lang.String.class, value)); return true;
case "requesttimeoutms":
case "requestTimeoutMs":
getOrCreateConfiguration(target).setRequestTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
+ case "resumestrategy":
+ case "resumeStrategy":
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class,
value)); return true;
case "retries":
getOrCreateConfiguration(target).setRetries(property(camelContext,
java.lang.Integer.class, value)); return true;
case "retrybackoffms":
case "retryBackoffMs":
getOrCreateConfiguration(target).setRetryBackoffMs(property(camelContext,
java.lang.Integer.class, value)); return true;
@@ -372,6 +374,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "requestRequiredAcks": return java.lang.String.class;
case "requesttimeoutms":
case "requestTimeoutMs": return java.lang.Integer.class;
+ case "resumestrategy":
+ case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
case "retries": return java.lang.Integer.class;
case "retrybackoffms":
case "retryBackoffMs": return java.lang.Integer.class;
@@ -580,6 +584,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "requestRequiredAcks": return
getOrCreateConfiguration(target).getRequestRequiredAcks();
case "requesttimeoutms":
case "requestTimeoutMs": return
getOrCreateConfiguration(target).getRequestTimeoutMs();
+ case "resumestrategy":
+ case "resumeStrategy": return
getOrCreateConfiguration(target).getResumeStrategy();
case "retries": return getOrCreateConfiguration(target).getRetries();
case "retrybackoffms":
case "retryBackoffMs": return
getOrCreateConfiguration(target).getRetryBackoffMs();
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 1dc0e4f..467b99b 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -148,6 +148,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "requestRequiredAcks":
target.getConfiguration().setRequestRequiredAcks(property(camelContext,
java.lang.String.class, value)); return true;
case "requesttimeoutms":
case "requestTimeoutMs":
target.getConfiguration().setRequestTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
+ case "resumestrategy":
+ case "resumeStrategy":
target.getConfiguration().setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class,
value)); return true;
case "retries":
target.getConfiguration().setRetries(property(camelContext,
java.lang.Integer.class, value)); return true;
case "retrybackoffms":
case "retryBackoffMs":
target.getConfiguration().setRetryBackoffMs(property(camelContext,
java.lang.Integer.class, value)); return true;
@@ -348,6 +350,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "requestRequiredAcks": return java.lang.String.class;
case "requesttimeoutms":
case "requestTimeoutMs": return java.lang.Integer.class;
+ case "resumestrategy":
+ case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
case "retries": return java.lang.Integer.class;
case "retrybackoffms":
case "retryBackoffMs": return java.lang.Integer.class;
@@ -549,6 +553,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "requestRequiredAcks": return
target.getConfiguration().getRequestRequiredAcks();
case "requesttimeoutms":
case "requestTimeoutMs": return
target.getConfiguration().getRequestTimeoutMs();
+ case "resumestrategy":
+ case "resumeStrategy": return
target.getConfiguration().getResumeStrategy();
case "retries": return target.getConfiguration().getRetries();
case "retrybackoffms":
case "retryBackoffMs": return
target.getConfiguration().getRetryBackoffMs();
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 989d47d..c9ba787 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends
org.apache.camel.support.component.
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(100);
+ Set<String> props = new HashSet<>(101);
props.add("synchronous");
props.add("queueBufferingMaxMessages");
props.add("allowManualCommit");
@@ -86,6 +86,7 @@ public class KafkaEndpointUriFactory extends
org.apache.camel.support.component.
props.add("lazyStartProducer");
props.add("sslKeystorePassword");
props.add("sslEndpointAlgorithm");
+ props.add("resumeStrategy");
props.add("topic");
props.add("sslProtocol");
props.add("sslKeymanagerAlgorithm");
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index bd15d54..353403d 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -55,6 +55,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignme [...]
"pollOnError": { "kind": "property", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "de [...]
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
+ "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
"seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Platf [...]
@@ -160,6 +161,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignm [...]
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "d [...]
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
+ "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
"seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Plat [...]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index d4c4dd1..b357ed9 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -147,6 +148,9 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
@UriParam(label = "consumer", defaultValue = "5000", javaType =
"java.time.Duration")
private Long commitTimeoutMs = 5000L;
+ @UriParam(label = "consumer")
+ private ResumeStrategy resumeStrategy;
+
// Producer configuration properties
@UriParam(label = "producer", defaultValue =
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -797,6 +801,26 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
this.breakOnFirstError = breakOnFirstError;
}
+ public ResumeStrategy getResumeStrategy() {
+ return resumeStrategy;
+ }
+
+ /**
+ * This option allows the user to set a custom resume strategy. The resume
strategy is executed when partitions are
+ * assigned (i.e.: when connecting or reconnecting). It allows
implementations to customize how to resume operations
+ * and serve as more flexible alternative to the seekTo and the
offsetRepository mechanisms.
+ *
+ * See the {@link ResumeStrategy} for implementation details.
+ *
+ * This option does not affect the auto commit setting. It is likely that
implementations using this setting will
+ * also want to evaluate using the manual commit option along with this.
+ *
+ * @param resumeStrategy An instance of the resume strategy
+ */
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+ this.resumeStrategy = resumeStrategy;
+ }
+
public String getBrokers() {
return brokers;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
index 6313337..9788f27 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
@@ -34,15 +34,13 @@ import static
org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
public class OffsetResumeStrategy implements ResumeStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(OffsetResumeStrategy.class);
- private final KafkaConsumer<?, ?> consumer;
private final StateRepository<String, String> offsetRepository;
- public OffsetResumeStrategy(KafkaConsumer<?, ?> consumer,
StateRepository<String, String> offsetRepository) {
- this.consumer = consumer;
+ public OffsetResumeStrategy(StateRepository<String, String>
offsetRepository) {
this.offsetRepository = offsetRepository;
}
- private void resumeFromOffset(TopicPartition topicPartition, String
offsetState) {
+ private void resumeFromOffset(final KafkaConsumer<?, ?> consumer,
TopicPartition topicPartition, String offsetState) {
// The state contains the last read offset, so you need to seek from
the next one
long offset = deserializeOffsetValue(offsetState) + 1;
LOG.debug("Resuming partition {} from offset {} from state",
topicPartition.partition(), offset);
@@ -50,12 +48,12 @@ public class OffsetResumeStrategy implements ResumeStrategy
{
}
@Override
- public void resume() {
+ public void resume(final KafkaConsumer<?, ?> consumer) {
Set<TopicPartition> assignments = consumer.assignment();
for (TopicPartition topicPartition : assignments) {
String offsetState =
offsetRepository.getState(serializeOffsetKey(topicPartition));
if (offsetState != null && !offsetState.isEmpty()) {
- resumeFromOffset(topicPartition, offsetState);
+ resumeFromOffset(consumer, topicPartition, offsetState);
}
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index ce02e37..e07db10 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.function.Supplier;
import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
@@ -52,11 +51,7 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
this.lastProcessedOffset = lastProcessedOffset;
this.stopStateSupplier = stopStateSupplier;
- StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
- String seekPolicy = configuration.getSeekTo();
-
- LOG.info("Performing resume as {} ", seekPolicy);
- resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer,
offsetRepository, seekPolicy);
+ resumeStrategy =
ResumeStrategyFactory.newResumeStrategy(configuration);
}
@Override
@@ -91,6 +86,6 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.debug("onPartitionsAssigned: {} from topic {}", threadId,
topicName);
- resumeStrategy.resume();
+ resumeStrategy.resume(consumer);
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
index e9b6511..ebe7ed6 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
@@ -17,13 +17,19 @@
package org.apache.camel.component.kafka.consumer.support;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
/**
* Defines a strategy for handling resume operations. Implementations can
define different ways to handle how to resume
* processing records.
*/
public interface ResumeStrategy {
/**
- * Perform the resume operation
+ * Perform the resume operation. This runs in the scope of the Kafka
Consumer thread and may run concurrently with
+ * other consumer instances when the component is set up to use more than
one of them. As such, implementations are
+ * responsible for ensuring the thread-safety of the operations within the
resume method.
+ *
+ * @param consumer an instance of the KafkaConsumer which is resuming the
operation
*/
- void resume();
+ void resume(KafkaConsumer<?, ?> consumer);
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 72fece7..44bd588 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -17,34 +17,52 @@
package org.apache.camel.component.kafka.consumer.support;
+import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class ResumeStrategyFactory {
/**
* A NO-OP resume strategy that does nothing (i.e.: no resume)
*/
private static class NoOpResumeStrategy implements ResumeStrategy {
+ @SuppressWarnings("unused")
@Override
- public void resume() {
-
+ public void resume(KafkaConsumer<?, ?> consumer) {
+ // NO-OP
}
}
private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new
NoOpResumeStrategy();
+ private static final Logger LOG =
LoggerFactory.getLogger(ResumeStrategyFactory.class);
private ResumeStrategyFactory() {
}
- public static ResumeStrategy newResumeStrategy(
- KafkaConsumer<?, ?> consumer, StateRepository<String, String>
offsetRepository,
- String seekTo) {
+ public static ResumeStrategy newResumeStrategy(KafkaConfiguration
configuration) {
+
+ if (configuration.getResumeStrategy() != null) {
+ return configuration.getResumeStrategy();
+ }
+
+ return builtinResumeStrategies(configuration);
+ }
+
+ private static ResumeStrategy builtinResumeStrategies(KafkaConfiguration
configuration) {
+ StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
+ String seekTo = configuration.getSeekTo();
+
if (offsetRepository != null) {
- return new OffsetResumeStrategy(consumer, offsetRepository);
+ LOG.info("Using resume from offset strategy");
+ return new OffsetResumeStrategy(offsetRepository);
} else if (seekTo != null) {
- return new SeekPolicyResumeStrategy(consumer, seekTo);
+ LOG.info("Using resume from seek policy strategy with seeking from
{}", seekTo);
+ return new SeekPolicyResumeStrategy(seekTo);
}
+ LOG.info("Using NO-OP resume strategy");
return NO_OP_RESUME_STRATEGY;
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
index d7606df..d58a23a 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
@@ -28,15 +28,13 @@ public class SeekPolicyResumeStrategy implements
ResumeStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(SeekPolicyResumeStrategy.class);
private final String seekPolicy;
- private final KafkaConsumer<?, ?> consumer;
- public SeekPolicyResumeStrategy(KafkaConsumer<?, ?> consumer, String
seekPolicy) {
+ public SeekPolicyResumeStrategy(String seekPolicy) {
this.seekPolicy = seekPolicy;
- this.consumer = consumer;
}
@Override
- public void resume() {
+ public void resume(final KafkaConsumer<?, ?> consumer) {
if (seekPolicy.equals("beginning")) {
LOG.debug("Seeking from the beginning of topic");
consumer.seekToBeginning(consumer.assignment());
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
new file mode 100644
index 0000000..21040d0
--- /dev/null
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaConsumerWithCustomResumeStrategyIT extends
BaseEmbeddedKafkaTestSupport {
+ private static final String TOPIC = "custom-resume";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @BindToRegistry("resumeStrategy")
+ private TestResumeStrategy resumeStrategy;
+ private CountDownLatch messagesLatch;
+
+ private static class TestResumeStrategy implements ResumeStrategy {
+ private final CountDownLatch messagesLatch;
+ private boolean resumeCalled;
+ private boolean consumerIsNull = true;
+
+ public TestResumeStrategy(CountDownLatch messagesLatch) {
+ this.messagesLatch = messagesLatch;
+ }
+
+ @Override
+ public void resume(KafkaConsumer<?, ?> consumer) {
+ resumeCalled = true;
+
+ if (consumer != null) {
+ consumerIsNull = false;
+ }
+
+ messagesLatch.countDown();
+ }
+
+ public boolean isResumeCalled() {
+ return resumeCalled;
+ }
+
+ public boolean isConsumerIsNull() {
+ return consumerIsNull;
+ }
+ }
+
+ @Override
+ protected void doPreSetup() {
+ messagesLatch = new CountDownLatch(1);
+ resumeStrategy = new TestResumeStrategy(messagesLatch);
+ }
+
+ @Test
+ @Timeout(value = 30)
+ public void offsetGetStateMustHaveBeenCalledTwice() throws
InterruptedException {
+ assertTrue(messagesLatch.await(4, TimeUnit.SECONDS), "The resume was
not called");
+
+ assertTrue(resumeStrategy.isResumeCalled(),
+ "The resume strategy should have been called when the
partition was assigned");
+ assertFalse(resumeStrategy.isConsumerIsNull(),
+ "The consumer passed to the strategy should not be null");
+ }
+
+ @AfterEach
+ public void after() {
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("kafka:" + TOPIC + "?groupId=" + TOPIC +
"_GROUP&autoCommitIntervalMs=1000"
+ + "&autoOffsetReset=latest" + "&consumersCount=1"
+ + "&resumeStrategy=#resumeStrategy")
+ .routeId("resume-strategy-route")
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 182a3d6..941ee86 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -674,6 +674,30 @@ public interface KafkaComponentBuilderFactory {
return this;
}
/**
+ * This option allows the user to set a custom resume strategy. The
+ * resume strategy is executed when partitions are assigned (i.e.: when
+ * connecting or reconnecting). It allows implementations to customize
+ * how to resume operations and serve as more flexible alternative to
+ * the seekTo and the offsetRepository mechanisms. See the
+ * ResumeStrategy for implementation details. This option does not
+ * affect the auto commit setting. It is likely that implementations
+ * using this setting will also want to evaluate using the manual
commit
+ * option along with this.
+ *
+ * The option is a:
+ *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default KafkaComponentBuilder resumeStrategy(
+
org.apache.camel.component.kafka.consumer.support.ResumeStrategy
resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning end : read from end This is
replacing
* the earlier property seekToBeginning.
@@ -2017,6 +2041,7 @@ public interface KafkaComponentBuilderFactory {
case "partitionAssignor":
getOrCreateConfiguration((KafkaComponent)
component).setPartitionAssignor((java.lang.String) value); return true;
case "pollOnError": getOrCreateConfiguration((KafkaComponent)
component).setPollOnError((org.apache.camel.component.kafka.PollOnError)
value); return true;
case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setPollTimeoutMs((java.lang.Long) value); return true;
+ case "resumeStrategy": getOrCreateConfiguration((KafkaComponent)
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.ResumeStrategy)
value); return true;
case "seekTo": getOrCreateConfiguration((KafkaComponent)
component).setSeekTo((java.lang.String) value); return true;
case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
case "specificAvroReader":
getOrCreateConfiguration((KafkaComponent)
component).setSpecificAvroReader((boolean) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 21066fd..3eaddbd 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1137,6 +1137,54 @@ public interface KafkaEndpointBuilderFactory {
return this;
}
/**
+ * This option allows the user to set a custom resume strategy. The
+ * resume strategy is executed when partitions are assigned (i.e.: when
+ * connecting or reconnecting). It allows implementations to customize
+ * how to resume operations and serve as more flexible alternative to
+ * the seekTo and the offsetRepository mechanisms. See the
+ * ResumeStrategy for implementation details. This option does not
+ * affect the auto commit setting. It is likely that implementations
+ * using this setting will also want to evaluate using the manual
commit
+ * option along with this.
+ *
+ * The option is a:
+ *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default KafkaEndpointConsumerBuilder resumeStrategy(
+ Object resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
+ * This option allows the user to set a custom resume strategy. The
+ * resume strategy is executed when partitions are assigned (i.e.: when
+ * connecting or reconnecting). It allows implementations to customize
+ * how to resume operations and serve as more flexible alternative to
+ * the seekTo and the offsetRepository mechanisms. See the
+ * ResumeStrategy for implementation details. This option does not
+ * affect the auto commit setting. It is likely that implementations
+ * using this setting will also want to evaluate using the manual
commit
+ * option along with this.
+ *
+ * The option will be converted to a
+ *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default KafkaEndpointConsumerBuilder resumeStrategy(
+ String resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning end : read from end This is
replacing
* the earlier property seekToBeginning.