AHeise commented on code in PR #193:
URL:
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014211861
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +115,55 @@ protected DynamoDbStreamsShardSplit toSplitType(
@Override
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
- splits.forEach(this::registerShardMetricGroup);
- super.addSplits(splits);
+ List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new
ArrayList<>();
+ for (DynamoDbStreamsShardSplit split : splits) {
+ if (split.isFinished()) {
+ // Replay the finished split event.
+ // We don't need to reload the split finished events in buffer
back
+ // since if the next checkpoint completes, these would just be
removed from the
+ // buffer. If the next checkpoint doesn't complete,
+ // we would go back to the previous checkpointed
+ // state which will again replay these split finished events.
+ context.sendSourceEventToCoordinator(
+ new
SplitsFinishedEvent(Collections.singleton(split.splitId())));
+ } else {
+ dynamoDbStreamsShardSplits.add(split);
+ }
+ }
+ dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+ super.addSplits(dynamoDbStreamsShardSplits);
+ }
+
+ /**
+ * At snapshot, we also store the pending finished split ids in the
current checkpoint so that
+ * in case we have to restore the reader from state, we also send the
finished split ids
+ * otherwise we run a risk of data loss during restarts of the source
because of the
+ * SplitsFinishedEvent going missing.
+ *
+ * @param checkpointId
+ * @return
+ */
+ @Override
+ public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+ this.currentCheckpointId = checkpointId;
+ List<DynamoDbStreamsShardSplit> splits =
super.snapshotState(checkpointId);
+
+ if (!splitFinishedEvents.isEmpty()) {
+ // Add all finished splits to the snapshot
+ splitFinishedEvents.values().forEach(splits::addAll);
Review Comment:
This implicitly assumes that the List returned from SourceReaderBase is
mutable. I'd make a copy here into an arraylist to be future-proof.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -58,10 +65,38 @@ public DynamoDbStreamsSourceReader(
Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
super(splitFetcherManager, recordEmitter, config, context);
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.splitFinishedEvents = new TreeMap<>();
+ this.currentCheckpointId = Long.MIN_VALUE;
}
+ /**
+ * We store the finished splits in a map keyed by the checkpoint id.
+ *
+ * @param finishedSplitIds
+ */
@Override
protected void onSplitFinished(Map<String, DynamoDbStreamsShardSplitState>
finishedSplitIds) {
+ if (finishedSplitIds.isEmpty()) {
+ return;
+ }
+
+ finishedSplitIds.values().stream()
+ .map(
+ finishedSplit ->
+ new DynamoDbStreamsShardSplit(
+ finishedSplit.getStreamArn(),
+ finishedSplit.getShardId(),
+
finishedSplit.getNextStartingPosition(),
+ finishedSplit
+ .getDynamoDbStreamsShardSplit()
+ .getParentShardId(),
+ true))
+ .forEach(
+ split ->
+ splitFinishedEvents
+ .computeIfAbsent(currentCheckpointId,
k -> new HashSet<>())
Review Comment:
nit you can pull that part out of the for loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]