This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ea6885d CAMEL-17186: added support for resume strategies for AWS2
Kinesis
ea6885d is described below
commit ea6885d1c997a6da761d672fcc8dc285b6e551c9
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Nov 10 13:30:54 2021 +0100
CAMEL-17186: added support for resume strategies for AWS2 Kinesis
---
.../aws2/kinesis/Kinesis2ComponentConfigurer.java | 6 +++
.../aws2/kinesis/Kinesis2EndpointConfigurer.java | 6 +++
.../aws2/kinesis/Kinesis2EndpointUriFactory.java | 3 +-
.../camel/component/aws2/kinesis/aws2-kinesis.json | 2 +
.../aws2/kinesis/Kinesis2Configuration.java | 14 +++++++
.../component/aws2/kinesis/Kinesis2Consumer.java | 23 +++++++-----
.../kinesis/consumer/KinesisResumeStrategy.java | 25 +++++++++++++
.../KinesisUserConfigurationResumeStrategy.java | 43 ++++++++++++++++++++++
.../dsl/Aws2KinesisComponentBuilderFactory.java | 19 ++++++++++
.../dsl/Kinesis2EndpointBuilderFactory.java | 36 ++++++++++++++++++
10 files changed, 167 insertions(+), 10 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
index 1a15fdc..97bb77a 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
@@ -54,6 +54,8 @@ public class Kinesis2ComponentConfigurer extends
PropertyConfigurerSupport imple
case "proxyprotocol":
case "proxyProtocol":
getOrCreateConfiguration(target).setProxyProtocol(property(camelContext,
software.amazon.awssdk.core.Protocol.class, value)); return true;
case "region":
getOrCreateConfiguration(target).setRegion(property(camelContext,
java.lang.String.class, value)); return true;
+ case "resumestrategy":
+ case "resumeStrategy":
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext,
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class,
value)); return true;
case "secretkey":
case "secretKey":
getOrCreateConfiguration(target).setSecretKey(property(camelContext,
java.lang.String.class, value)); return true;
case "sequencenumber":
@@ -106,6 +108,8 @@ public class Kinesis2ComponentConfigurer extends
PropertyConfigurerSupport imple
case "proxyprotocol":
case "proxyProtocol": return
software.amazon.awssdk.core.Protocol.class;
case "region": return java.lang.String.class;
+ case "resumestrategy":
+ case "resumeStrategy": return
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class;
case "secretkey":
case "secretKey": return java.lang.String.class;
case "sequencenumber":
@@ -154,6 +158,8 @@ public class Kinesis2ComponentConfigurer extends
PropertyConfigurerSupport imple
case "proxyprotocol":
case "proxyProtocol": return
getOrCreateConfiguration(target).getProxyProtocol();
case "region": return getOrCreateConfiguration(target).getRegion();
+ case "resumestrategy":
+ case "resumeStrategy": return
getOrCreateConfiguration(target).getResumeStrategy();
case "secretkey":
case "secretKey": return
getOrCreateConfiguration(target).getSecretKey();
case "sequencenumber":
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
index b165e71..00e68d0 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
@@ -62,6 +62,8 @@ public class Kinesis2EndpointConfigurer extends
PropertyConfigurerSupport implem
case "region":
target.getConfiguration().setRegion(property(camelContext,
java.lang.String.class, value)); return true;
case "repeatcount":
case "repeatCount": target.setRepeatCount(property(camelContext,
long.class, value)); return true;
+ case "resumestrategy":
+ case "resumeStrategy":
target.getConfiguration().setResumeStrategy(property(camelContext,
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class,
value)); return true;
case "runlogginglevel":
case "runLoggingLevel":
target.setRunLoggingLevel(property(camelContext,
org.apache.camel.LoggingLevel.class, value)); return true;
case "scheduledexecutorservice":
@@ -144,6 +146,8 @@ public class Kinesis2EndpointConfigurer extends
PropertyConfigurerSupport implem
case "region": return java.lang.String.class;
case "repeatcount":
case "repeatCount": return long.class;
+ case "resumestrategy":
+ case "resumeStrategy": return
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class;
case "runlogginglevel":
case "runLoggingLevel": return org.apache.camel.LoggingLevel.class;
case "scheduledexecutorservice":
@@ -222,6 +226,8 @@ public class Kinesis2EndpointConfigurer extends
PropertyConfigurerSupport implem
case "region": return target.getConfiguration().getRegion();
case "repeatcount":
case "repeatCount": return target.getRepeatCount();
+ case "resumestrategy":
+ case "resumeStrategy": return
target.getConfiguration().getResumeStrategy();
case "runlogginglevel":
case "runLoggingLevel": return target.getRunLoggingLevel();
case "scheduledexecutorservice":
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
index 1ba2211..5dafce0 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
@@ -20,7 +20,7 @@ public class Kinesis2EndpointUriFactory extends
org.apache.camel.support.compone
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(38);
+ Set<String> props = new HashSet<>(39);
props.add("backoffMultiplier");
props.add("initialDelay");
props.add("scheduler");
@@ -56,6 +56,7 @@ public class Kinesis2EndpointUriFactory extends
org.apache.camel.support.compone
props.add("startScheduler");
props.add("accessKey");
props.add("overrideEndpoint");
+ props.add("resumeStrategy");
props.add("maxResultsPerRequest");
props.add("region");
props.add("exceptionHandler");
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index bc7981b..51a2f4c 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -36,6 +36,7 @@
"bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Allows for bridging the
consumer to the Camel routing Error Handler, which mean any exceptions occurred
while the consumer is trying to pickup incoming messages, or the likes, will
now be processed as a me [...]
"iteratorType": { "kind": "property", "displayName": "Iterator Type",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType",
"enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON",
"LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass":
"org.apache.camel.component.aws2.kinesi [...]
"maxResultsPerRequest": { "kind": "property", "displayName": "Max Results
Per Request", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
+ "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"KinesisUserConfigurationResumeStrategy", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "descri [...]
"sequenceNumber": { "kind": "property", "displayName": "Sequence Number",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or
[...]
"shardClosed": { "kind": "property", "displayName": "Shard Closed",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "des [...]
"shardId": { "kind": "property", "displayName": "Shard Id", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Defines which shardId in
the Kinesis stream to get records from" },
@@ -59,6 +60,7 @@
"bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Allows for bridging the
consumer to the Camel routing Error Handler, which mean any exceptions occurred
while the consumer is trying to pickup incoming messages, or the likes, will
now be processed as a m [...]
"iteratorType": { "kind": "parameter", "displayName": "Iterator Type",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType",
"enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON",
"LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass":
"org.apache.camel.component.aws2.kines [...]
"maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results
Per Request", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
+ "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"KinesisUserConfigurationResumeStrategy", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "descr [...]
"sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send
Empty Message When Idle", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
the polling consumer did not poll any files, you can enable this option to send
an empty message (no body) instead." },
"sequenceNumber": { "kind": "parameter", "displayName": "Sequence Number",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or
[...]
"shardClosed": { "kind": "parameter", "displayName": "Shard Closed",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "de [...]
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index db57da4..0ded74b 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws2.kinesis;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
@@ -83,6 +84,11 @@ public class Kinesis2Configuration implements Cloneable {
"static credentials to be passed in.")
private boolean useDefaultCredentialsProvider;
+ @UriParam(label = "consumer",
+ description = "Defines a resume strategy for AWS Kinesis. The
default strategy reads the sequenceNumber if provided",
+ defaultValue = "KinesisUserConfigurationResumeStrategy")
+ private KinesisResumeStrategy resumeStrategy;
+
public KinesisClient getAmazonKinesisClient() {
return amazonKinesisClient;
}
@@ -227,6 +233,14 @@ public class Kinesis2Configuration implements Cloneable {
this.useDefaultCredentialsProvider = useDefaultCredentialsProvider;
}
+ public KinesisResumeStrategy getResumeStrategy() {
+ return resumeStrategy;
+ }
+
+ public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) {
+ this.resumeStrategy = resumeStrategy;
+ }
+
// *************************************************
//
// *************************************************
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 3446612..44cec89 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -23,6 +23,8 @@ import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
+import
org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -37,7 +39,6 @@ import
software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
@@ -160,9 +161,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer {
.streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
.shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
- if (hasSequenceNumber()) {
-
req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
- }
+ resume(req);
GetShardIteratorResponse result =
getClient().getShardIterator(req.build());
currentShardIterator = result.shardIterator();
@@ -171,6 +170,17 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer {
return currentShardIterator;
}
+ private void resume(GetShardIteratorRequest.Builder req) {
+ KinesisResumeStrategy resumeStrategy;
+ if (getEndpoint().getConfiguration().getResumeStrategy() == null) {
+ resumeStrategy = new
KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration());
+ } else {
+ resumeStrategy =
getEndpoint().getConfiguration().getResumeStrategy();
+ }
+
+ resumeStrategy.resume(req);
+ }
+
private Queue<Exchange> createExchanges(List<Record> records) {
Queue<Exchange> exchanges = new ArrayDeque<>();
for (Record record : records) {
@@ -192,9 +202,4 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer {
return exchange;
}
- private boolean hasSequenceNumber() {
- return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
- &&
(getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
- ||
getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
- }
}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
new file mode 100644
index 0000000..7e0e5bc
--- /dev/null
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.ResumeStrategy;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+
+public interface KinesisResumeStrategy extends
ResumeStrategy<GetShardIteratorRequest.Builder> {
+
+}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
new file mode 100644
index 0000000..4d78990
--- /dev/null
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class KinesisUserConfigurationResumeStrategy implements
KinesisResumeStrategy {
+ private final Kinesis2Configuration configuration;
+
+ public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration
configuration) {
+ this.configuration = configuration;
+ }
+
+ private boolean hasSequenceNumber() {
+ return !configuration.getSequenceNumber().isEmpty()
+ &&
(configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ ||
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+ }
+
+ @Override
+ public void resume(GetShardIteratorRequest.Builder resumable) {
+ if (hasSequenceNumber()) {
+
resumable.startingSequenceNumber(configuration.getSequenceNumber());
+ }
+ }
+}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
index c6ac5e1..061c1c0 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
@@ -286,6 +286,24 @@ public interface Aws2KinesisComponentBuilderFactory {
return this;
}
/**
+ * Defines a resume strategy for AWS Kinesis. The default strategy
reads
+ * the sequenceNumber if provided.
+ *
+ * The option is a:
+ *
<code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code>
type.
+ *
+ * Default: KinesisUserConfigurationResumeStrategy
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default Aws2KinesisComponentBuilder resumeStrategy(
+
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy
resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
* The sequence number to start polling from. Required if iteratorType
* is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER.
*
@@ -448,6 +466,7 @@ public interface Aws2KinesisComponentBuilderFactory {
case "bridgeErrorHandler": ((Kinesis2Component)
component).setBridgeErrorHandler((boolean) value); return true;
case "iteratorType": getOrCreateConfiguration((Kinesis2Component)
component).setIteratorType((software.amazon.awssdk.services.kinesis.model.ShardIteratorType)
value); return true;
case "maxResultsPerRequest":
getOrCreateConfiguration((Kinesis2Component)
component).setMaxResultsPerRequest((int) value); return true;
+ case "resumeStrategy":
getOrCreateConfiguration((Kinesis2Component)
component).setResumeStrategy((org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy)
value); return true;
case "sequenceNumber":
getOrCreateConfiguration((Kinesis2Component)
component).setSequenceNumber((java.lang.String) value); return true;
case "shardClosed": getOrCreateConfiguration((Kinesis2Component)
component).setShardClosed((org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum)
value); return true;
case "shardId": getOrCreateConfiguration((Kinesis2Component)
component).setShardId((java.lang.String) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 958879b8..77f6817 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -438,6 +438,42 @@ public interface Kinesis2EndpointBuilderFactory {
return this;
}
/**
+ * Defines a resume strategy for AWS Kinesis. The default strategy
reads
+ * the sequenceNumber if provided.
+ *
+ * The option is a:
+ *
<code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code>
type.
+ *
+ * Default: KinesisUserConfigurationResumeStrategy
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default Kinesis2EndpointConsumerBuilder resumeStrategy(
+ Object resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
+ * Defines a resume strategy for AWS Kinesis. The default strategy
reads
+ * the sequenceNumber if provided.
+ *
+ * The option will be converted to a
+ *
<code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code>
type.
+ *
+ * Default: KinesisUserConfigurationResumeStrategy
+ * Group: consumer
+ *
+ * @param resumeStrategy the value to set
+ * @return the dsl builder
+ */
+ default Kinesis2EndpointConsumerBuilder resumeStrategy(
+ String resumeStrategy) {
+ doSetProperty("resumeStrategy", resumeStrategy);
+ return this;
+ }
+ /**
* If the polling consumer did not poll any files, you can enable this
* option to send an empty message (no body) instead.
*