leekeiabstraction commented on code in PR #193:
URL: 
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014041555


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -138,14 +138,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
 
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        Set<DynamoDbStreamsShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);

Review Comment:
   Nit: Place line after 
`splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());` 
instead, moving `splitAssignment` declaration closer to where it's used.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -333,7 +333,13 @@ private boolean 
verifyParentIsEitherFinishedOrCleanedUp(DynamoDbStreamsShardSpli
                 || isFinished(split.getParentShardId());
     }
 
-    private boolean isFinished(String splitId) {
+    /**
+     * Provides information whether a split is finished or not.
+     *
+     * @param splitId
+     * @return

Review Comment:
   Incomplete java doc. Suggest something like the below.
   
   ```
   * @return      boolean value indicating if split is finished
   ```



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +115,54 @@ protected DynamoDbStreamsShardSplit toSplitType(
 
     @Override
     public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
-        splits.forEach(this::registerShardMetricGroup);
-        super.addSplits(splits);
+        List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new 
ArrayList<>();
+        for (DynamoDbStreamsShardSplit split : splits) {
+            if (split.isFinished()) {
+                // Restore finished splits state
+                splitFinishedEvents
+                        .computeIfAbsent(split.getFinishedAfterCheckpoint(), k 
-> new HashSet<>())
+                        .add(split);
+                // Replay the finished split event
+                context.sendSourceEventToCoordinator(
+                        new 
SplitsFinishedEvent(Collections.singleton(split.splitId())));
+            } else {
+                dynamoDbStreamsShardSplits.add(split);
+            }
+        }
+        dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+        super.addSplits(dynamoDbStreamsShardSplits);
+    }
+
+    /**
+     * At snapshot, we also store the pending finished split ids in the 
current checkpoint so that
+     * in case we have to restore the reader from state, we also send the 
finished split ids
+     * otherwise we run a risk of data loss during restarts of the source 
because of the
+     * SplitsFinishedEvent going missing.
+     *
+     * @param checkpointId
+     * @return
+     */
+    @Override
+    public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+        this.currentCheckpointId = checkpointId;
+        List<DynamoDbStreamsShardSplit> splits = 
super.snapshotState(checkpointId);
+
+        if (!splitFinishedEvents.isEmpty()) {
+            // Add all finished splits to the snapshot
+            splitFinishedEvents.values().forEach(splits::addAll);
+        }
+        return splits;
+    }
+
+    /**
+     * During notifyCheckpointComplete, we should clean up the state of 
finished splits that are
+     * older than the checkpoint id.
+     *
+     * @param checkpointId
+     */
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        splitFinishedEvents.headMap(checkpointId, true).clear();

Review Comment:
   Java doc says we are cleaning up state of finished splits that are older 
(interpreting as exclusive) than checkpoint id, however we are providing `true` 
as inclusive arg in headMap. Which one is the intended behaviour?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -40,12 +40,22 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
     private final String shardId;
     private final StartingPosition startingPosition;
     private final String parentShardId;
+    private final long finishedAfterCheckpoint;

Review Comment:
   Perhaps more of a question than comment. Would `finishedDuringCheckpoint` be 
more accurate? If I understand correctly, checkpointing is still in progress 
and the issue occurs when split happens before reader completed checkpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to