hlteoh37 commented on code in PR #193:
URL:
https://github.com/apache/flink-connector-aws/pull/193#discussion_r2014341394
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -139,13 +139,25 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
/** When we mark a split as finished, we will only assign its child splits
to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
- splitAssignment
- .get(subtaskId)
- .removeIf(
- split ->
- splitsFinishedEvent
- .getFinishedSplitIds()
- .contains(split.splitId()));
+
+ Set<DynamoDbStreamsShardSplit> splitsAssignment =
splitAssignment.get(subtaskId);
+ // during recovery, splitAssignment may return null since there might
be no split assigned
+ // to the subtask, but there might be SplitsFinishedEvent from that
subtask.
+ // We will not do child shard assignment if that is the case since
that might lead to child
+ // shards trying to get assigned before there being any readers.
+ if (splitsAssignment == null) {
Review Comment:
Good spot Arvid..!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]