This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 096c3c085c2 CAMEL-20958: camel-aws - Kinesis consumer loops on closed 
shards (#15230)
096c3c085c2 is described below

commit 096c3c085c20927f8e7deaf33f0764b203aec3ce
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Aug 21 08:21:16 2024 +0200

    CAMEL-20958: camel-aws - Kinesis consumer loops on closed shards (#15230)
    
    CAMEL-20958: camel-aws - Kinesis consumer loops on closed shards
---
 .../camel/catalog/components/aws2-kinesis.json     |  4 +--
 .../camel/component/aws2/kinesis/aws2-kinesis.json |  4 +--
 .../aws2/kinesis/Kinesis2Configuration.java        |  6 ++--
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 24 ++++++++++---
 .../KinesisConsumerClosedShardWithSilentTest.java  | 39 +++++++++++++---------
 .../dsl/Aws2KinesisComponentBuilderFactory.java    | 10 +++---
 .../dsl/Kinesis2EndpointBuilderFactory.java        | 20 +++++------
 .../camel/kotlin/components/Aws2KinesisUriDsl.kt   |  6 ++--
 8 files changed, 68 insertions(+), 45 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
index 90713f67676..5f46322e0b6 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
@@ -32,7 +32,7 @@
     "iteratorType": { "index": 6, "kind": "property", "displayName": "Iterator 
Type", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ 
"AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", 
"AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": "TRIM_HORIZON", "configurationClass": 
"org.apache.camel.component [...]
     "maxResultsPerRequest": { "index": 7, "kind": "property", "displayName": 
"Max Results Per Request", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "int", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "sequenceNumber": { "index": 8, "kind": "property", "displayName": 
"Sequence Number", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUENC [...]
-    "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configur [...]
+    "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configur [...]
     "shardId": { "index": 10, "kind": "property", "displayName": "Shard Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Defines which shardId in 
the Kinesis stream to get records from" },
     "shardMonitorInterval": { "index": 11, "kind": "property", "displayName": 
"Shard Monitor Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
10000, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The interval in 
milliseconds to wait between  [...]
     "lazyStartProducer": { "index": 12, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fai [...]
@@ -74,7 +74,7 @@
     "maxResultsPerRequest": { "index": 6, "kind": "parameter", "displayName": 
"Max Results Per Request", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "int", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "sendEmptyMessageWhenIdle": { "index": 7, "kind": "parameter", 
"displayName": "Send Empty Message When Idle", "group": "consumer", "label": 
"consumer", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If the polling consumer did not poll any files, you can 
enable this option to send an empty message (no body) instead." },
     "sequenceNumber": { "index": 8, "kind": "parameter", "displayName": 
"Sequence Number", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUEN [...]
-    "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configu [...]
+    "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configu [...]
     "shardId": { "index": 10, "kind": "parameter", "displayName": "Shard Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Defines which shardId in 
the Kinesis stream to get records from" },
     "bridgeErrorHandler": { "index": 11, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
     "exceptionHandler": { "index": 12, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index 90713f67676..5f46322e0b6 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -32,7 +32,7 @@
     "iteratorType": { "index": 6, "kind": "property", "displayName": "Iterator 
Type", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ 
"AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", 
"AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": "TRIM_HORIZON", "configurationClass": 
"org.apache.camel.component [...]
     "maxResultsPerRequest": { "index": 7, "kind": "property", "displayName": 
"Max Results Per Request", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "int", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "sequenceNumber": { "index": 8, "kind": "property", "displayName": 
"Sequence Number", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUENC [...]
-    "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configur [...]
+    "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configur [...]
     "shardId": { "index": 10, "kind": "property", "displayName": "Shard Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Defines which shardId in 
the Kinesis stream to get records from" },
     "shardMonitorInterval": { "index": 11, "kind": "property", "displayName": 
"Shard Monitor Interval", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
10000, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The interval in 
milliseconds to wait between  [...]
     "lazyStartProducer": { "index": 12, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fai [...]
@@ -74,7 +74,7 @@
     "maxResultsPerRequest": { "index": 6, "kind": "parameter", "displayName": 
"Max Results Per Request", "group": "consumer", "label": "consumer", 
"required": false, "type": "integer", "javaType": "int", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "sendEmptyMessageWhenIdle": { "index": 7, "kind": "parameter", 
"displayName": "Send Empty Message When Idle", "group": "consumer", "label": 
"consumer", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If the polling consumer did not poll any files, you can 
enable this option to send an empty message (no body) instead." },
     "sequenceNumber": { "index": 8, "kind": "parameter", "displayName": 
"Sequence Number", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUEN [...]
-    "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configu [...]
+    "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard 
Closed", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configu [...]
     "shardId": { "index": 10, "kind": "parameter", "displayName": "Shard Id", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Defines which shardId in 
the Kinesis stream to get records from" },
     "bridgeErrorHandler": { "index": 11, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
     "exceptionHandler": { "index": 12, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 6ba5d24d338..ae2b1ed1682 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -60,9 +60,9 @@ public class Kinesis2Configuration implements Cloneable {
     private String sequenceNumber = "";
     @UriParam(label = "consumer", defaultValue = "ignore",
               description = "Define what will be the behavior in case of shard 
closed. Possible value are ignore, silent and fail."
-                            + " In case of ignore a message will be logged and 
the consumer will restart from the beginning,"
-                            + "in case of silent there will be no logging and 
the consumer will start from the beginning,"
-                            + "in case of fail a ReachedClosedStateException 
will be raised")
+                            + " In case of ignore a WARN message will be 
logged once and the consumer will not process new messages until restarted,"
+                            + "in case of silent there will be no logging and 
the consumer will not process new messages until restarted,"
+                            + "in case of fail a ReachedClosedStateException 
will be thrown")
     private Kinesis2ShardClosedStrategyEnum shardClosed;
     @UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS",
               description = "To define a proxy protocol when instantiating the 
Kinesis client")
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index c33cedab3ba..85fa04ecc25 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -53,6 +53,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
     private ResumeStrategy resumeStrategy;
 
     private final Map<String, String> currentShardIterators = new 
java.util.HashMap<>();
+    private final Set<String> warnLogged = new HashSet<>();
 
     private volatile List<Shard> currentShardList = List.of();
     private static final String SHARD_MONITOR_EXECUTOR_NAME = 
"Kinesis_shard_monitor";
@@ -71,11 +72,23 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
         this.connection = connection;
     }
 
+    public boolean isShardClosed(String shardId) {
+        return currentShardIterators.get(shardId) == null && 
currentShardIterators.containsKey(shardId);
+    }
+
     @Override
     protected int poll() throws Exception {
         var processedExchangeCount = new AtomicInteger(0);
 
-        if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
+        String shardId = getEndpoint().getConfiguration().getShardId();
+        if (!shardId.isEmpty()) {
+            // skip if the shard is closed
+            if (isShardClosed(shardId)) {
+                // There was previously a shardIterator but shard is now closed
+                handleClosedShard(shardId);
+                return 0;
+            }
+
             var request = DescribeStreamRequest
                     .builder()
                     
.streamName(getEndpoint().getConfiguration().getStreamName())
@@ -129,7 +142,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
             final Shard shard,
             final KinesisConnection kinesisConnection,
             AtomicInteger processedExchangeCount) {
-        String shardIterator = null;
+        String shardIterator;
         try {
             shardIterator = getShardIterator(shard, kinesisConnection);
         } catch (InterruptedException e) {
@@ -222,6 +235,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
             if (currentShardIterators.containsKey(shardId)) {
                 // There was previously a shardIterator but shard is now closed
                 handleClosedShard(shardId);
+                return null; // we cannot get the shard again as its closed
             }
 
             GetShardIteratorRequest.Builder request = 
GetShardIteratorRequest.builder()
@@ -264,8 +278,10 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
     private void handleClosedShard(String shardId) {
         switch (getEndpoint().getConfiguration().getShardClosed()) {
             case ignore:
-                LOG.warn("The shard with id={} on stream {} reached CLOSE 
status",
-                        shardId, 
getEndpoint().getConfiguration().getStreamName());
+                if (warnLogged.add(shardId)) {
+                    LOG.warn("The shard with id={} on stream {} reached CLOSE 
status",
+                            shardId, 
getEndpoint().getConfiguration().getStreamName());
+                }
                 break;
             case silent:
                 break;
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 643bb199ad9..3c7303b8b2a 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -70,22 +71,23 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @BeforeEach
     public void setup() {
-        SequenceNumberRange range = 
SequenceNumberRange.builder().endingSequenceNumber("20").build();
+        SequenceNumberRange range = 
SequenceNumberRange.builder().endingSequenceNumber("2").build();
         Shard shard = 
Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
+        var r1 = GetRecordsResponse.builder()
+                .nextShardIterator(null)
+                .records(
+                        Record.builder().sequenceNumber("1")
+                                .data(SdkBytes.fromString("Hello", 
Charset.defaultCharset()))
+                                .build(),
+                        Record.builder().sequenceNumber("2")
+                                .data(SdkBytes.fromString("Bye", 
Charset.defaultCharset()))
+                                .build())
+                .build();
         when(kinesisClient
-                
.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
-                        .nextShardIterator("nextShardIterator")
-                        .records(
-                                Record.builder().sequenceNumber("1")
-                                        .data(SdkBytes.fromString("Hello", 
Charset.defaultCharset()))
-                                        .build(),
-                                Record.builder().sequenceNumber("2")
-                                        .data(SdkBytes.fromString("Hello", 
Charset.defaultCharset()))
-                                        .build())
-                        .build());
+                .getRecords(any(GetRecordsRequest.class))).thenReturn(r1);
         when(kinesisClient
                 .getShardIterator(any(GetShardIteratorRequest.class)))
                 
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
@@ -179,15 +181,20 @@ public class KinesisConsumerClosedShardWithSilentTest {
     }
 
     @Test
-    public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
-        underTest.poll();
-        underTest.poll();
+    public void itUsesTheShardIteratorOnSubsequentPolls() throws Exception {
+        // should not be closed on start
+        Assertions.assertFalse(underTest.isShardClosed("shardId"));
+
+        underTest.poll(); // pull records and reached EOL of shard
+        underTest.poll(); // the shard is now closed
 
         final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = 
ArgumentCaptor.forClass(GetRecordsRequest.class);
         // On second call it uses the one returned from the first call
         verify(kinesisClient, 
times(1)).getShardIterator(any(GetShardIteratorRequest.class));
-        verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
+        verify(kinesisClient, times(1)).getRecords(getRecordsReqCap.capture());
         assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), 
is("shardIterator"));
-        assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), 
is("nextShardIterator"));
+
+        // should be closed
+        Assertions.assertTrue(underTest.isShardClosed("shardId"));
     }
 }
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
index fc2b21e5159..b2ec314042f 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
@@ -217,11 +217,11 @@ public interface Aws2KinesisComponentBuilderFactory {
         
         /**
          * Define what will be the behavior in case of shard closed. Possible
-         * value are ignore, silent and fail. In case of ignore a message will
-         * be logged and the consumer will restart from the beginning,in case 
of
-         * silent there will be no logging and the consumer will start from the
-         * beginning,in case of fail a ReachedClosedStateException will be
-         * raised.
+         * value are ignore, silent and fail. In case of ignore a WARN message
+         * will be logged once and the consumer will not process new messages
+         * until restarted,in case of silent there will be no logging and the
+         * consumer will not process new messages until restarted,in case of
+         * fail a ReachedClosedStateException will be thrown.
          * 
          * The option is a:
          * 
&lt;code&gt;org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum&lt;/code&gt;
 type.
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 15382b0e424..b93f3f3ad85 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -248,11 +248,11 @@ public interface Kinesis2EndpointBuilderFactory {
         }
         /**
          * Define what will be the behavior in case of shard closed. Possible
-         * value are ignore, silent and fail. In case of ignore a message will
-         * be logged and the consumer will restart from the beginning,in case 
of
-         * silent there will be no logging and the consumer will start from the
-         * beginning,in case of fail a ReachedClosedStateException will be
-         * raised.
+         * value are ignore, silent and fail. In case of ignore a WARN message
+         * will be logged once and the consumer will not process new messages
+         * until restarted,in case of silent there will be no logging and the
+         * consumer will not process new messages until restarted,in case of
+         * fail a ReachedClosedStateException will be thrown.
          * 
          * The option is a:
          * 
<code>org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum</code>
 type.
@@ -269,11 +269,11 @@ public interface Kinesis2EndpointBuilderFactory {
         }
         /**
          * Define what will be the behavior in case of shard closed. Possible
-         * value are ignore, silent and fail. In case of ignore a message will
-         * be logged and the consumer will restart from the beginning,in case 
of
-         * silent there will be no logging and the consumer will start from the
-         * beginning,in case of fail a ReachedClosedStateException will be
-         * raised.
+         * value are ignore, silent and fail. In case of ignore a WARN message
+         * will be logged once and the consumer will not process new messages
+         * until restarted,in case of silent there will be no logging and the
+         * consumer will not process new messages until restarted,in case of
+         * fail a ReachedClosedStateException will be thrown.
          * 
          * The option will be converted to a
          * 
<code>org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum</code>
 type.
diff --git 
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
 
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
index db966f81ff9..2a221fc441c 100644
--- 
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
+++ 
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
@@ -145,9 +145,9 @@ public class Aws2KinesisUriDsl(
 
   /**
    * Define what will be the behavior in case of shard closed. Possible value 
are ignore, silent and
-   * fail. In case of ignore a message will be logged and the consumer will 
restart from the
-   * beginning,in case of silent there will be no logging and the consumer 
will start from the
-   * beginning,in case of fail a ReachedClosedStateException will be raised
+   * fail. In case of ignore a WARN message will be logged once and the 
consumer will not process new
+   * messages until restarted,in case of silent there will be no logging and 
the consumer will not
+   * process new messages until restarted,in case of fail a 
ReachedClosedStateException will be thrown
    */
   public fun shardClosed(shardClosed: String) {
     it.property("shardClosed", shardClosed)

Reply via email to