elphastori commented on code in PR #21102:
URL: https://github.com/apache/flink/pull/21102#discussion_r1009635044


##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java:
##########
@@ -110,6 +111,24 @@ public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSe
         return result;
     }
 
+    @Override
+    public String getShardIterator(
+            StreamShardHandle shard, String shardIteratorType, @Nullable 
Object startingMarker)
+            throws InterruptedException {
+        try {
+            return super.getShardIterator(shard, shardIteratorType, 
startingMarker);
+        } catch (ResourceNotFoundException re) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info(
+                        "Received ResourceNotFoundException. "
+                                + "Shard {} of stream {} is no longer valid, 
marking it as complete.",
+                        shard.getShard().getShardId(),
+                        shard.getStreamName());
+            }
+            return null;
+        }
+    }
+

Review Comment:
   Thanks, that's a good catch. From analysing the code, `getShardIterator` is 
only called after `getShardList`, particularly in the `DynamoDBStreamsProxy` 
which guarantees that deleted streams/tables throw an unhandled 
`NoResourceFoundException`.
   
   This is for both when the job is
   - starting (with and without a savepoint)
   - running - the consumer is discovering new shards.
   
   I've now the tested the following scenarios to verify the analysis - the 
failure modes have not changed
   - Starting the Flink job with or without a savepoint for a recently deleted 
stream/table (< 24 hrs) results in all shards getting closed - the DDB streams 
service returns a stream with `null` shard iterators
   - Deleting a table/stream on the fly also results in all shards getting 
closed
   - Using a stream/table that has been deleted for more than 24 hrs (or 
providing an altogether incorrect stream/table name) results in the job failing 
with a `NoResourceFoundException` thrown in `getShardList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to