hlteoh37 commented on code in PR #193:
URL: 
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014227005


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -139,13 +139,25 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
         splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
-        splitAssignment
-                .get(subtaskId)
-                .removeIf(
-                        split ->
-                                splitsFinishedEvent
-                                        .getFinishedSplitIds()
-                                        .contains(split.splitId()));
+
+        Set<DynamoDbStreamsShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);
+        // during recovery, splitAssignment may return null since there might 
be no split assigned
+        // to the subtask, but there might be SplitsFinishedEvent from that 
subtask.
+        // We will not do child shard assignment if that is the case since 
that might lead to child
+        // shards trying to get assigned before there being any readers.
+        if (splitsAssignment == null) {

Review Comment:
   Why don't we make this a generic warn if we don't find the finished split in 
the assigned splits? (Instead of if no split is assigned to subtask)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to