AHeise commented on code in PR #193:
URL: 
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2011444992


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -138,14 +138,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
 
     /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        Set<DynamoDbStreamsShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);

Review Comment:
   Unrelated, have you considered a Map indexed by split id?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +105,65 @@ protected DynamoDbStreamsShardSplit toSplitType(
 
     @Override
     public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
-        splits.forEach(this::registerShardMetricGroup);
-        super.addSplits(splits);
+        List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new 
ArrayList<>();
+        for (DynamoDbStreamsShardSplit split : splits) {
+            if 
(split.getShardId().equals(DYNAMODB_STREAMS_COMPLETED_SHARD_ID)) {
+                // Restore finished splits state
+                splitFinishedEvents.putAll(split.getFinishedSplitsMap());
+
+                // Replay all stored finished events
+                splitFinishedEvents.values().stream()
+                        .flatMap(Set::stream)
+                        .collect(Collectors.toSet())
+                        .forEach(
+                                splitId ->
+                                        context.sendSourceEventToCoordinator(
+                                                new SplitsFinishedEvent(
+                                                        
Collections.singleton(splitId))));
+            } else {
+                dynamoDbStreamsShardSplits.add(split);
+            }
+        }
+        dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+        super.addSplits(dynamoDbStreamsShardSplits);
+    }
+
+    /**
+     * At snapshot, we also store the pending finished split ids in the 
current checkpoint so that
+     * in case we have to restore the reader from state, we also send the 
finished split ids
+     * otherwise we run a risk of data loss during restarts of the source 
because of the
+     * SplitsFinishedEvent going missing.
+     *
+     * @param checkpointId
+     * @return
+     */
+    @Override
+    public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+        this.currentCheckpointId = checkpointId;
+        List<DynamoDbStreamsShardSplit> splits = 
super.snapshotState(checkpointId);
+
+        if (!splitFinishedEvents.isEmpty()) {
+            // Always add a dedicated split for finished state
+            splits.add(
+                    new DynamoDbStreamsShardSplit(
+                            "",
+                            DYNAMODB_STREAMS_COMPLETED_SHARD_ID,
+                            StartingPosition.fromStart(),
+                            null,
+                            splitFinishedEvents));
+        }
+        return splits;

Review Comment:
   While reviewing, I came up with an alternative approach where you use a flag 
in `DynamoDbStreamsShardSplit` that signals the finished state. Then you need 
to keep the split around one checkpoint longer.
   Note that in the snapshot, you don't need to know at which checkpoint the 
split is finished (no need for the TreeMap in the snapshot), you just need to 
know whether a split has been finished before the checkpoint or not.
   
   So one way would be to have a new field `finishedAfterCheckpoint` in the 
split snapshot which you set `onSplitFinished` but you keep the split in your 
state (potentially by maintaining a `finishedSplit` field). In 
`notifyCheckpointCompleted`, you can remove all splits where 
`finishedAfterCheckpoint < checkpointId`.
   On recovery, you ignore the specific value of `finishedAfterCheckpoint` and 
just use it as a flag to resend the event.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/assigner/UniformShardAssigner.java:
##########
@@ -54,7 +54,8 @@ public int assign(DynamoDbStreamsShardSplit split, Context 
context) {
 
         Preconditions.checkArgument(
                 selectedSubtask != -1,
-                "Expected at least one registered reader. Unable to assign 
split.");
+                "Expected at least one registered reader. Unable to assign 
split with id: "
+                        + split.splitId());

Review Comment:
   nit: I'd use the overload that uses String.format under the hood. Then, you 
avoid string concatenation until an actual issue arises.



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java:
##########
@@ -158,4 +165,148 @@ void testAddSplitsRegistersAndUpdatesShardMetricGroup() 
throws Exception {
         TestUtil.assertMillisBehindLatest(
                 split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, 
metricListener);
     }
+
+    @Test
+    void testSnapshotStateWithFinishedSplits() throws Exception {
+        // Create and add a split
+        DynamoDbStreamsShardSplit split = getTestSplit();
+        List<DynamoDbStreamsShardSplit> splits = 
Collections.singletonList(split);
+        sourceReader.addSplits(splits);
+
+        // Set checkpoint ID by taking initial snapshot
+        List<DynamoDbStreamsShardSplit> initialSnapshot = 
sourceReader.snapshotState(1L);
+        assertThat(initialSnapshot).hasSize(1).containsExactly(split);
+
+        // Simulate split finishing
+        Map<String, DynamoDbStreamsShardSplitState> finishedSplits = new 
HashMap<>();
+        finishedSplits.put(split.splitId(), new 
DynamoDbStreamsShardSplitState(split));
+        sourceReader.onSplitFinished(finishedSplits);
+
+        // Take another snapshot
+        List<DynamoDbStreamsShardSplit> snapshotSplits = 
sourceReader.snapshotState(1L);
+
+        // Verify we have 2 splits - the original split and the state split
+        assertThat(snapshotSplits).hasSize(2);
+
+        // Find and verify the state split
+        Optional<DynamoDbStreamsShardSplit> stateSplitOptional =
+                snapshotSplits.stream()
+                        .filter(s -> 
s.getShardId().equals("finished-splits-state"))
+                        .findFirst();
+
+        assertThat(stateSplitOptional)
+                .isPresent()
+                .hasValueSatisfying(
+                        stateSplit -> {
+                            NavigableMap<Long, Set<String>> finishedSplitsMap =
+                                    stateSplit.getFinishedSplitsMap();
+                            
assertThat(finishedSplitsMap).hasSize(1).containsKey(1L);
+                            
assertThat(finishedSplitsMap.get(1L)).contains(split.splitId());
+                        });
+
+        // Find and verify the original split
+        Optional<DynamoDbStreamsShardSplit> originalSplitOptional =
+                snapshotSplits.stream()
+                        .filter(s -> s.getShardId().equals(split.getShardId()))
+                        .findFirst();
+
+        assertThat(originalSplitOptional)
+                .isPresent()
+                .hasValueSatisfying(
+                        originalSplit ->
+                                assertThat(originalSplit)
+                                        .usingRecursiveComparison()
+                                        .isEqualTo(split));

Review Comment:
   nit: you can shorten this to
   
   ```
   
           assertThat(snapshotSplits)
                   .hasSize(2)
                   .contains(split)
                   .filteredOn(s -> 
s.getShardId().equals("finished-splits-state"))
                   .singleElement()
                   .extracting(
                           DynamoDbStreamsShardSplit::getFinishedSplitsMap,
                           as(InstanceOfAssertFactories.map(Long.class, 
Set.class)))
                   .containsExactly(Map.entry(1L, Set.of(split.splitId())));
   ```



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -40,6 +43,7 @@ public final class DynamoDbStreamsShardSplit implements 
SourceSplit {
     private final String shardId;
     private final StartingPosition startingPosition;
     private final String parentShardId;
+    private final NavigableMap<Long, Set<String>> finishedSplitsMap;

Review Comment:
   As mentioned above, you don't need to store a TreeMap here (it would only be 
needed int the reader). You can flatten the values into a single collection 
which makes serialization and recovery easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to