gguptp commented on code in PR #193:
URL:
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2011546662
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java:
##########
@@ -80,8 +105,65 @@ protected DynamoDbStreamsShardSplit toSplitType(
@Override
public void addSplits(List<DynamoDbStreamsShardSplit> splits) {
- splits.forEach(this::registerShardMetricGroup);
- super.addSplits(splits);
+ List<DynamoDbStreamsShardSplit> dynamoDbStreamsShardSplits = new
ArrayList<>();
+ for (DynamoDbStreamsShardSplit split : splits) {
+ if
(split.getShardId().equals(DYNAMODB_STREAMS_COMPLETED_SHARD_ID)) {
+ // Restore finished splits state
+ splitFinishedEvents.putAll(split.getFinishedSplitsMap());
+
+ // Replay all stored finished events
+ splitFinishedEvents.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet())
+ .forEach(
+ splitId ->
+ context.sendSourceEventToCoordinator(
+ new SplitsFinishedEvent(
+
Collections.singleton(splitId))));
+ } else {
+ dynamoDbStreamsShardSplits.add(split);
+ }
+ }
+ dynamoDbStreamsShardSplits.forEach(this::registerShardMetricGroup);
+ super.addSplits(dynamoDbStreamsShardSplits);
+ }
+
+ /**
+ * At snapshot, we also store the pending finished split ids in the
current checkpoint so that
+ * in case we have to restore the reader from state, we also send the
finished split ids
+ * otherwise we run a risk of data loss during restarts of the source
because of the
+ * SplitsFinishedEvent going missing.
+ *
+ * @param checkpointId
+ * @return
+ */
+ @Override
+ public List<DynamoDbStreamsShardSplit> snapshotState(long checkpointId) {
+ this.currentCheckpointId = checkpointId;
+ List<DynamoDbStreamsShardSplit> splits =
super.snapshotState(checkpointId);
+
+ if (!splitFinishedEvents.isEmpty()) {
+ // Always add a dedicated split for finished state
+ splits.add(
+ new DynamoDbStreamsShardSplit(
+ "",
+ DYNAMODB_STREAMS_COMPLETED_SHARD_ID,
+ StartingPosition.fromStart(),
+ null,
+ splitFinishedEvents));
+ }
+ return splits;
Review Comment:
The only problem here is that we dont have access to the split state
directly from the SourceReaderBase class and given the number of shards which
DDB has, the `notifyCheckpointCompleted` function might become O(n) operation
as compared to today where it is O(log(n)) operation and might hit performance
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]