AHeise commented on code in PR #193:
URL:
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2011444992
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -138,14 +138,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
/** When we mark a split as finished, we will only assign its child splits
to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
+ Set<DynamoDbStreamsShardSplit> splitsAssignment =
splitAssignment.get(subtaskId);
Review Comment:
Unrelated, have you considered a Map indexed by split id?
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +105,65 @@ protected DynamoDbStreamsShardSplit toSplitType(
@Override
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
- splits.forEach(this::registerShardMetricGroup);
- super.addSplits(splits);
+ List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new
ArrayList<>();
+ for (DynamoDbStreamsShardSplit split : splits) {
+ if
(split.getShardId().equals(DYNAMODB_STREAMS_COMPLETED_SHARD_ID)) {
+ // Restore finished splits state
+ splitFinishedEvents.putAll(split.getFinishedSplitsMap());
+
+ // Replay all stored finished events
+ splitFinishedEvents.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet())
+ .forEach(
+ splitId ->
+ context.sendSourceEventToCoordinator(
+ new SplitsFinishedEvent(
+
Collections.singleton(splitId))));
+ } else {
+ dynamoDbStreamsShardSplits.add(split);
+ }
+ }
+ dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+ super.addSplits(dynamoDbStreamsShardSplits);
+ }
+
+ /**
+ * At snapshot, we also store the pending finished split ids in the
current checkpoint so that
+ * in case we have to restore the reader from state, we also send the
finished split ids
+ * otherwise we run a risk of data loss during restarts of the source
because of the
+ * SplitsFinishedEvent going missing.
+ *
+ * @param checkpointId
+ * @return
+ */
+ @Override
+ public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+ this.currentCheckpointId = checkpointId;
+ List<DynamoDbStreamsShardSplit> splits =
super.snapshotState(checkpointId);
+
+ if (!splitFinishedEvents.isEmpty()) {
+ // Always add a dedicated split for finished state
+ splits.add(
+ new DynamoDbStreamsShardSplit(
+ "",
+ DYNAMODB_STREAMS_COMPLETED_SHARD_ID,
+ StartingPosition.fromStart(),
+ null,
+ splitFinishedEvents));
+ }
+ return splits;
Review Comment:
While reviewing, I came up with an alternative approach where you use a flag
in `DynamoDbStreamsShardSplit` that signals the finished state. Then you need
to keep the split around one checkpoint longer.
Note that in the snapshot, you don't need to know at which checkpoint the
split is finished (no need for the TreeMap in the snapshot), you just need to
know whether a split has been finished before the checkpoint or not.
So one way would be to have a new field `finishedAfterCheckpoint` in the
split snapshot which you set `onSplitFinished` but you keep the split in your
state (potentially by maintaining a `finishedSplit` field). In
`notifyCheckpointCompleted`, you can remove all splits where
`finishedAfterCheckpoint < checkpointId`.
On recovery, you ignore the specific value of `finishedAfterCheckpoint` and
just use it as a flag to resend the event.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/assigner/UniformShardAssigner.java:
##########
@@ -54,7 +54,8 @@ public int assign(DynamoDbStreamsShardSplit split, Context
context) {
Preconditions.checkArgument(
selectedSubtask != -1,
- "Expected at least one registered reader. Unable to assign
split.");
+ "Expected at least one registered reader. Unable to assign
split with id: "
+ + split.splitId());
Review Comment:
nit: I'd use the overload that uses String.format under the hood. Then, you
avoid string concatenation until an actual issue arises.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java:
##########
@@ -158,4 +165,148 @@ void testAddSplitsRegistersAndUpdatesShardMetricGroup()
throws Exception {
TestUtil.assertMillisBehindLatest(
split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE,
metricListener);
}
+
+ @Test
+ void testSnapshotStateWithFinishedSplits() throws Exception {
+ // Create and add a split
+ DynamoDbStreamsShardSplit split = getTestSplit();
+ List<DynamoDbStreamsShardSplit> splits =
Collections.singletonList(split);
+ sourceReader.addSplits(splits);
+
+ // Set checkpoint ID by taking initial snapshot
+ List<DynamoDbStreamsShardSplit> initialSnapshot =
sourceReader.snapshotState(1L);
+ assertThat(initialSnapshot).hasSize(1).containsExactly(split);
+
+ // Simulate split finishing
+ Map<String, DynamoDbStreamsShardSplitState> finishedSplits = new
HashMap<>();
+ finishedSplits.put(split.splitId(), new
DynamoDbStreamsShardSplitState(split));
+ sourceReader.onSplitFinished(finishedSplits);
+
+ // Take another snapshot
+ List<DynamoDbStreamsShardSplit> snapshotSplits =
sourceReader.snapshotState(1L);
+
+ // Verify we have 2 splits - the original split and the state split
+ assertThat(snapshotSplits).hasSize(2);
+
+ // Find and verify the state split
+ Optional<DynamoDbStreamsShardSplit> stateSplitOptional =
+ snapshotSplits.stream()
+ .filter(s ->
s.getShardId().equals("finished-splits-state"))
+ .findFirst();
+
+ assertThat(stateSplitOptional)
+ .isPresent()
+ .hasValueSatisfying(
+ stateSplit -> {
+ NavigableMap<Long, Set<String>> finishedSplitsMap =
+ stateSplit.getFinishedSplitsMap();
+
assertThat(finishedSplitsMap).hasSize(1).containsKey(1L);
+
assertThat(finishedSplitsMap.get(1L)).contains(split.splitId());
+ });
+
+ // Find and verify the original split
+ Optional<DynamoDbStreamsShardSplit> originalSplitOptional =
+ snapshotSplits.stream()
+ .filter(s -> s.getShardId().equals(split.getShardId()))
+ .findFirst();
+
+ assertThat(originalSplitOptional)
+ .isPresent()
+ .hasValueSatisfying(
+ originalSplit ->
+ assertThat(originalSplit)
+ .usingRecursiveComparison()
+ .isEqualTo(split));
Review Comment:
nit: you can shorten this to
```
assertThat(snapshotSplits)
.hasSize(2)
.contains(split)
.filteredOn(s ->
s.getShardId().equals("finished-splits-state"))
.singleElement()
.extracting(
DynamoDbStreamsShardSplit::getFinishedSplitsMap,
as(InstanceOfAssertFactories.map(Long.class,
Set.class)))
.containsExactly(Map.entry(1L, Set.of(split.splitId())));
```
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java:
##########
@@ -40,6 +43,7 @@ public final class DynamoDbStreamsShardSplit implements
SourceSplit {
private final String shardId;
private final StartingPosition startingPosition;
private final String parentShardId;
+ private final NavigableMap<Long, Set<String>> finishedSplitsMap;
Review Comment:
As mentioned above, you don't need to store a TreeMap here (it would only be
needed int the reader). You can flatten the values into a single collection
which makes serialization and recovery easier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]