AHeise commented on code in PR #193:
URL: 
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014306328


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -139,13 +139,25 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
         splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
-        splitAssignment
-                .get(subtaskId)
-                .removeIf(
-                        split ->
-                                splitsFinishedEvent
-                                        .getFinishedSplitIds()
-                                        .contains(split.splitId()));
+
+        Set<DynamoDbStreamsShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);
+        // during recovery, splitAssignment may return null since there might 
be no split assigned
+        // to the subtask, but there might be SplitsFinishedEvent from that 
subtask.
+        // We will not do child shard assignment if that is the case since 
that might lead to child
+        // shards trying to get assigned before there being any readers.
+        if (splitsAssignment == null) {

Review Comment:
   I'd not weaken consistency checks here for all events. I'd mark the events 
as recovered and then allow some leniency (not even WARN because this is 
expected).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to