This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 096c3c085c2 CAMEL-20958: camel-aws - Kinesis consumer loops on closed
shards (#15230)
096c3c085c2 is described below
commit 096c3c085c20927f8e7deaf33f0764b203aec3ce
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Aug 21 08:21:16 2024 +0200
CAMEL-20958: camel-aws - Kinesis consumer loops on closed shards (#15230)
CAMEL-20958: camel-aws - Kinesis consumer loops on closed shards
---
.../camel/catalog/components/aws2-kinesis.json | 4 +--
.../camel/component/aws2/kinesis/aws2-kinesis.json | 4 +--
.../aws2/kinesis/Kinesis2Configuration.java | 6 ++--
.../component/aws2/kinesis/Kinesis2Consumer.java | 24 ++++++++++---
.../KinesisConsumerClosedShardWithSilentTest.java | 39 +++++++++++++---------
.../dsl/Aws2KinesisComponentBuilderFactory.java | 10 +++---
.../dsl/Kinesis2EndpointBuilderFactory.java | 20 +++++------
.../camel/kotlin/components/Aws2KinesisUriDsl.kt | 6 ++--
8 files changed, 68 insertions(+), 45 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
index 90713f67676..5f46322e0b6 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
@@ -32,7 +32,7 @@
"iteratorType": { "index": 6, "kind": "property", "displayName": "Iterator
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [
"AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST",
"AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "TRIM_HORIZON", "configurationClass":
"org.apache.camel.component [...]
"maxResultsPerRequest": { "index": 7, "kind": "property", "displayName":
"Max Results Per Request", "group": "consumer", "label": "consumer",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
"sequenceNumber": { "index": 8, "kind": "property", "displayName":
"Sequence Number", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUENC [...]
- "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configur [...]
+ "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configur [...]
"shardId": { "index": 10, "kind": "property", "displayName": "Shard Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Defines which shardId in
the Kinesis stream to get records from" },
"shardMonitorInterval": { "index": 11, "kind": "property", "displayName":
"Shard Monitor Interval", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
10000, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The interval in
milliseconds to wait between [...]
"lazyStartProducer": { "index": 12, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
@@ -74,7 +74,7 @@
"maxResultsPerRequest": { "index": 6, "kind": "parameter", "displayName":
"Max Results Per Request", "group": "consumer", "label": "consumer",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
"sendEmptyMessageWhenIdle": { "index": 7, "kind": "parameter",
"displayName": "Send Empty Message When Idle", "group": "consumer", "label":
"consumer", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "If the polling consumer did not poll any files, you can
enable this option to send an empty message (no body) instead." },
"sequenceNumber": { "index": 8, "kind": "parameter", "displayName":
"Sequence Number", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUEN [...]
- "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configu [...]
+ "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configu [...]
"shardId": { "index": 10, "kind": "parameter", "displayName": "Shard Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Defines which shardId in
the Kinesis stream to get records from" },
"bridgeErrorHandler": { "index": 11, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 12, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index 90713f67676..5f46322e0b6 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -32,7 +32,7 @@
"iteratorType": { "index": 6, "kind": "property", "displayName": "Iterator
Type", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [
"AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST",
"AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "TRIM_HORIZON", "configurationClass":
"org.apache.camel.component [...]
"maxResultsPerRequest": { "index": 7, "kind": "property", "displayName":
"Max Results Per Request", "group": "consumer", "label": "consumer",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
"sequenceNumber": { "index": 8, "kind": "property", "displayName":
"Sequence Number", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUENC [...]
- "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configur [...]
+ "shardClosed": { "index": 9, "kind": "property", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configur [...]
"shardId": { "index": 10, "kind": "property", "displayName": "Shard Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Defines which shardId in
the Kinesis stream to get records from" },
"shardMonitorInterval": { "index": 11, "kind": "property", "displayName":
"Shard Monitor Interval", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
10000, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The interval in
milliseconds to wait between [...]
"lazyStartProducer": { "index": 12, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
@@ -74,7 +74,7 @@
"maxResultsPerRequest": { "index": 6, "kind": "parameter", "displayName":
"Max Results Per Request", "group": "consumer", "label": "consumer",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Maximum number of
records that will be fetched in each poll" },
"sendEmptyMessageWhenIdle": { "index": 7, "kind": "parameter",
"displayName": "Send Empty Message When Idle", "group": "consumer", "label":
"consumer", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "If the polling consumer did not poll any files, you can
enable this option to send an empty message (no body) instead." },
"sequenceNumber": { "index": 8, "kind": "parameter", "displayName":
"Sequence Number", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "The sequence number to
start polling from. Required if iteratorType is set to AFTER_SEQUEN [...]
- "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configu [...]
+ "shardClosed": { "index": 9, "kind": "parameter", "displayName": "Shard
Closed", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum",
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ignore", "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configu [...]
"shardId": { "index": 10, "kind": "parameter", "displayName": "Shard Id",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Defines which shardId in
the Kinesis stream to get records from" },
"bridgeErrorHandler": { "index": 11, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 12, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 6ba5d24d338..ae2b1ed1682 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -60,9 +60,9 @@ public class Kinesis2Configuration implements Cloneable {
private String sequenceNumber = "";
@UriParam(label = "consumer", defaultValue = "ignore",
description = "Define what will be the behavior in case of shard
closed. Possible value are ignore, silent and fail."
- + " In case of ignore a message will be logged and
the consumer will restart from the beginning,"
- + "in case of silent there will be no logging and
the consumer will start from the beginning,"
- + "in case of fail a ReachedClosedStateException
will be raised")
+ + " In case of ignore a WARN message will be
logged once and the consumer will not process new messages until restarted,"
+ + "in case of silent there will be no logging and
the consumer will not process new messages until restarted,"
+ + "in case of fail a ReachedClosedStateException
will be thrown")
private Kinesis2ShardClosedStrategyEnum shardClosed;
@UriParam(label = "proxy", enums = "HTTP,HTTPS", defaultValue = "HTTPS",
description = "To define a proxy protocol when instantiating the
Kinesis client")
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index c33cedab3ba..85fa04ecc25 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -53,6 +53,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
private ResumeStrategy resumeStrategy;
private final Map<String, String> currentShardIterators = new
java.util.HashMap<>();
+ private final Set<String> warnLogged = new HashSet<>();
private volatile List<Shard> currentShardList = List.of();
private static final String SHARD_MONITOR_EXECUTOR_NAME =
"Kinesis_shard_monitor";
@@ -71,11 +72,23 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
this.connection = connection;
}
+ public boolean isShardClosed(String shardId) {
+ return currentShardIterators.get(shardId) == null &&
currentShardIterators.containsKey(shardId);
+ }
+
@Override
protected int poll() throws Exception {
var processedExchangeCount = new AtomicInteger(0);
- if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
+ String shardId = getEndpoint().getConfiguration().getShardId();
+ if (!shardId.isEmpty()) {
+ // skip if the shard is closed
+ if (isShardClosed(shardId)) {
+ // There was previously a shardIterator but shard is now closed
+ handleClosedShard(shardId);
+ return 0;
+ }
+
var request = DescribeStreamRequest
.builder()
.streamName(getEndpoint().getConfiguration().getStreamName())
@@ -129,7 +142,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
final Shard shard,
final KinesisConnection kinesisConnection,
AtomicInteger processedExchangeCount) {
- String shardIterator = null;
+ String shardIterator;
try {
shardIterator = getShardIterator(shard, kinesisConnection);
} catch (InterruptedException e) {
@@ -222,6 +235,7 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
if (currentShardIterators.containsKey(shardId)) {
// There was previously a shardIterator but shard is now closed
handleClosedShard(shardId);
+ return null; // we cannot get the shard again as its closed
}
GetShardIteratorRequest.Builder request =
GetShardIteratorRequest.builder()
@@ -264,8 +278,10 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
private void handleClosedShard(String shardId) {
switch (getEndpoint().getConfiguration().getShardClosed()) {
case ignore:
- LOG.warn("The shard with id={} on stream {} reached CLOSE
status",
- shardId,
getEndpoint().getConfiguration().getStreamName());
+ if (warnLogged.add(shardId)) {
+ LOG.warn("The shard with id={} on stream {} reached CLOSE
status",
+ shardId,
getEndpoint().getConfiguration().getStreamName());
+ }
break;
case silent:
break;
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 643bb199ad9..3c7303b8b2a 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -70,22 +71,23 @@ public class KinesisConsumerClosedShardWithSilentTest {
@BeforeEach
public void setup() {
- SequenceNumberRange range =
SequenceNumberRange.builder().endingSequenceNumber("20").build();
+ SequenceNumberRange range =
SequenceNumberRange.builder().endingSequenceNumber("2").build();
Shard shard =
Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
+ var r1 = GetRecordsResponse.builder()
+ .nextShardIterator(null)
+ .records(
+ Record.builder().sequenceNumber("1")
+ .data(SdkBytes.fromString("Hello",
Charset.defaultCharset()))
+ .build(),
+ Record.builder().sequenceNumber("2")
+ .data(SdkBytes.fromString("Bye",
Charset.defaultCharset()))
+ .build())
+ .build();
when(kinesisClient
-
.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("nextShardIterator")
- .records(
- Record.builder().sequenceNumber("1")
- .data(SdkBytes.fromString("Hello",
Charset.defaultCharset()))
- .build(),
- Record.builder().sequenceNumber("2")
- .data(SdkBytes.fromString("Hello",
Charset.defaultCharset()))
- .build())
- .build());
+ .getRecords(any(GetRecordsRequest.class))).thenReturn(r1);
when(kinesisClient
.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
@@ -179,15 +181,20 @@ public class KinesisConsumerClosedShardWithSilentTest {
}
@Test
- public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
- underTest.poll();
- underTest.poll();
+ public void itUsesTheShardIteratorOnSubsequentPolls() throws Exception {
+ // should not be closed on start
+ Assertions.assertFalse(underTest.isShardClosed("shardId"));
+
+ underTest.poll(); // pull records and reached EOL of shard
+ underTest.poll(); // the shard is now closed
final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap =
ArgumentCaptor.forClass(GetRecordsRequest.class);
// On second call it uses the one returned from the first call
verify(kinesisClient,
times(1)).getShardIterator(any(GetShardIteratorRequest.class));
- verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
+ verify(kinesisClient, times(1)).getRecords(getRecordsReqCap.capture());
assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(),
is("shardIterator"));
- assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(),
is("nextShardIterator"));
+
+ // should be closed
+ Assertions.assertTrue(underTest.isShardClosed("shardId"));
}
}
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
index fc2b21e5159..b2ec314042f 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
@@ -217,11 +217,11 @@ public interface Aws2KinesisComponentBuilderFactory {
/**
* Define what will be the behavior in case of shard closed. Possible
- * value are ignore, silent and fail. In case of ignore a message will
- * be logged and the consumer will restart from the beginning,in case
of
- * silent there will be no logging and the consumer will start from the
- * beginning,in case of fail a ReachedClosedStateException will be
- * raised.
+ * value are ignore, silent and fail. In case of ignore a WARN message
+ * will be logged once and the consumer will not process new messages
+ * until restarted,in case of silent there will be no logging and the
+ * consumer will not process new messages until restarted,in case of
+ * fail a ReachedClosedStateException will be thrown.
*
* The option is a:
*
<code>org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum</code>
type.
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 15382b0e424..b93f3f3ad85 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -248,11 +248,11 @@ public interface Kinesis2EndpointBuilderFactory {
}
/**
* Define what will be the behavior in case of shard closed. Possible
- * value are ignore, silent and fail. In case of ignore a message will
- * be logged and the consumer will restart from the beginning,in case
of
- * silent there will be no logging and the consumer will start from the
- * beginning,in case of fail a ReachedClosedStateException will be
- * raised.
+ * value are ignore, silent and fail. In case of ignore a WARN message
+ * will be logged once and the consumer will not process new messages
+ * until restarted,in case of silent there will be no logging and the
+ * consumer will not process new messages until restarted,in case of
+ * fail a ReachedClosedStateException will be thrown.
*
* The option is a:
*
<code>org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum</code>
type.
@@ -269,11 +269,11 @@ public interface Kinesis2EndpointBuilderFactory {
}
/**
* Define what will be the behavior in case of shard closed. Possible
- * value are ignore, silent and fail. In case of ignore a message will
- * be logged and the consumer will restart from the beginning,in case
of
- * silent there will be no logging and the consumer will start from the
- * beginning,in case of fail a ReachedClosedStateException will be
- * raised.
+ * value are ignore, silent and fail. In case of ignore a WARN message
+ * will be logged once and the consumer will not process new messages
+ * until restarted,in case of silent there will be no logging and the
+ * consumer will not process new messages until restarted,in case of
+ * fail a ReachedClosedStateException will be thrown.
*
* The option will be converted to a
*
<code>org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum</code>
type.
diff --git
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
index db966f81ff9..2a221fc441c 100644
---
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
+++
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/Aws2KinesisUriDsl.kt
@@ -145,9 +145,9 @@ public class Aws2KinesisUriDsl(
/**
* Define what will be the behavior in case of shard closed. Possible value
are ignore, silent and
- * fail. In case of ignore a message will be logged and the consumer will
restart from the
- * beginning,in case of silent there will be no logging and the consumer
will start from the
- * beginning,in case of fail a ReachedClosedStateException will be raised
+ * fail. In case of ignore a WARN message will be logged once and the
consumer will not process new
+ * messages until restarted,in case of silent there will be no logging and
the consumer will not
+ * process new messages until restarted,in case of fail a
ReachedClosedStateException will be thrown
*/
public fun shardClosed(shardClosed: String) {
it.property("shardClosed", shardClosed)