AHeise commented on code in PR #193:
URL: 
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2011599746


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +105,65 @@ protected DynamoDbStreamsShardSplit toSplitType(
 
     @Override
     public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
-        splits.forEach(this::registerShardMetricGroup);
-        super.addSplits(splits);
+        List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new 
ArrayList<>();
+        for (DynamoDbStreamsShardSplit split : splits) {
+            if 
(split.getShardId().equals(DYNAMODB_STREAMS_COMPLETED_SHARD_ID)) {
+                // Restore finished splits state
+                splitFinishedEvents.putAll(split.getFinishedSplitsMap());
+
+                // Replay all stored finished events
+                splitFinishedEvents.values().stream()
+                        .flatMap(Set::stream)
+                        .collect(Collectors.toSet())
+                        .forEach(
+                                splitId ->
+                                        context.sendSourceEventToCoordinator(
+                                                new SplitsFinishedEvent(
+                                                        
Collections.singleton(splitId))));
+            } else {
+                dynamoDbStreamsShardSplits.add(split);
+            }
+        }
+        dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+        super.addSplits(dynamoDbStreamsShardSplits);
+    }
+
+    /**
+     * At snapshot, we also store the pending finished split ids in the 
current checkpoint so that
+     * in case we have to restore the reader from state, we also send the 
finished split ids
+     * otherwise we run a risk of data loss during restarts of the source 
because of the
+     * SplitsFinishedEvent going missing.
+     *
+     * @param checkpointId
+     * @return
+     */
+    @Override
+    public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+        this.currentCheckpointId = checkpointId;
+        List<DynamoDbStreamsShardSplit> splits = 
super.snapshotState(checkpointId);
+
+        if (!splitFinishedEvents.isEmpty()) {
+            // Always add a dedicated split for finished state
+            splits.add(
+                    new DynamoDbStreamsShardSplit(
+                            "",
+                            DYNAMODB_STREAMS_COMPLETED_SHARD_ID,
+                            StartingPosition.fromStart(),
+                            null,
+                            splitFinishedEvents));
+        }
+        return splits;

Review Comment:
   The access issue can be solved by maintaining a separate `finishedSplits` 
collection where you add in `onSplitFinished`. `snapshotState` needs then to 
union the splits from SourceReaderBase and your own splits.
   
   This would then probably also solve `notifyCheckpointCompleted` by actually 
reducing n to the number of finished splits. (not sure how many splits per 
checkpoints get finished) If you want to stick to log(N) you could combine both 
approaches and let `finishedSplits` be a TreeMap again. Then you really just 
need to add a boolean flag to the split.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to