gguptp commented on code in PR #193:
URL:
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014045507
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +115,54 @@ protected DynamoDbStreamsShardSplit toSplitType(
@Override
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
- splits.forEach(this::registerShardMetricGroup);
- super.addSplits(splits);
+ List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new
ArrayList<>();
+ for (DynamoDbStreamsShardSplit split : splits) {
+ if (split.isFinished()) {
+ // Restore finished splits state
+ splitFinishedEvents
+ .computeIfAbsent(split.getFinishedAfterCheckpoint(), k
-> new HashSet<>())
+ .add(split);
+ // Replay the finished split event
+ context.sendSourceEventToCoordinator(
+ new
SplitsFinishedEvent(Collections.singleton(split.splitId())));
+ } else {
+ dynamoDbStreamsShardSplits.add(split);
+ }
+ }
+ dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+ super.addSplits(dynamoDbStreamsShardSplits);
+ }
+
+ /**
+ * At snapshot, we also store the pending finished split ids in the
current checkpoint so that
+ * in case we have to restore the reader from state, we also send the
finished split ids
+ * otherwise we run a risk of data loss during restarts of the source
because of the
+ * SplitsFinishedEvent going missing.
+ *
+ * @param checkpointId
+ * @return
+ */
+ @Override
+ public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+ this.currentCheckpointId = checkpointId;
+ List<DynamoDbStreamsShardSplit> splits =
super.snapshotState(checkpointId);
+
+ if (!splitFinishedEvents.isEmpty()) {
+ // Add all finished splits to the snapshot
+ splitFinishedEvents.values().forEach(splits::addAll);
+ }
+ return splits;
+ }
+
+ /**
+ * During notifyCheckpointComplete, we should clean up the state of
finished splits that are
+ * older than the checkpoint id.
+ *
+ * @param checkpointId
+ */
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ splitFinishedEvents.headMap(checkpointId, true).clear();
Review Comment:
good point, any splits which are <= this checkpoint will be cleared.
the intended behaviour is as follows:
- Checkpoint 1 was taken
- Buffer any finished events between checkpoint 1 and checkpoint 2 in the
splitFinishedEvents
- Checkpoint 2 was taken. Checkpoint 2 will have all splitfinished events
happening after checkpoint 1
- notifyCheckpoint will remove all buffer from 1 and 2
- new buffer from checkpoint 2 and 3
- notify checkpoint will remove all buffer <=3
I will update the javadoc to make sure it mentions inclusive
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]