Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5992#discussion_r187788338
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
@@ -353,19 +355,21 @@ protected static boolean
isRecoverableException(AmazonServiceException ex) {
private List<StreamShardHandle> getShardsOfStream(String streamName,
@Nullable String lastSeenShardId) throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
- DescribeStreamResult describeStreamResult;
+ // List Shards returns just the first 1000 shard entries. In
order to read the entire stream,
+ // we need to use the returned nextToken to get additional
shards.
+ ListShardsResult listShardsResult;
+ String startShardToken = null;
do {
- describeStreamResult = describeStream(streamName,
lastSeenShardId);
-
- List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
+ listShardsResult = listShards(streamName,
lastSeenShardId, startShardToken);
--- End diff --
if within `listShards(...)` we caught the `ExpiredNextTokenException`, then
`null` will be returned as the result, correct?
If so, then the current built up `shardsIfStream` will be returned
immediately, regardless of whether or not there are more shards following.
Although it might not be too common that we have expired tokens here, I
wonder if we can handle this case more gracefully (e.g., re-fetching a token to
make sure that there really is no more shards).
---