This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9b0d983 [FLINK-36270][Connectors/DynamoDB] Assign only children
splits of a split when it is marked as finished
9b0d983 is described below
commit 9b0d98343a892be64d25d26064108aa27a255bd0
Author: gguptp <[email protected]>
AuthorDate: Thu Sep 12 17:40:26 2024 +0530
[FLINK-36270][Connectors/DynamoDB] Assign only children splits of a split
when it is marked as finished
---
.../DynamoDbStreamsSourceEnumerator.java | 21 +++++-
.../source/enumerator/tracker/SplitTracker.java | 74 ++++++++++++++++---
.../enumerator/tracker/SplitTrackerTest.java | 85 ++++++++++++++++++++++
3 files changed, 166 insertions(+), 14 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
index e62c0f3..8a8f29c 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
@@ -128,6 +128,7 @@ public class DynamoDbStreamsSourceEnumerator
}
}
+ /** When we mark a split as finished, we will only assign its child splits
to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent
splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
splitAssignment
@@ -137,7 +138,7 @@ public class DynamoDbStreamsSourceEnumerator
splitsFinishedEvent
.getFinishedSplitIds()
.contains(split.splitId()));
- assignSplits();
+ assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
}
private void processDiscoveredSplits(ListShardsResult discoveredSplits,
Throwable throwable) {
@@ -161,7 +162,7 @@ public class DynamoDbStreamsSourceEnumerator
context.registeredReaders().size());
return;
}
- assignSplits();
+ assignAllAvailableSplits();
}
/**
@@ -200,9 +201,21 @@ public class DynamoDbStreamsSourceEnumerator
return splitGraphInconsistencyTracker;
}
- private void assignSplits() {
+ private void assignAllAvailableSplits() {
+ List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+ splitTracker.splitsAvailableForAssignment();
+ assignSplits(splitsAvailableForAssignment);
+ }
+
+ private void assignChildSplits(Set<String> finishedSplitIds) {
+ List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+ splitTracker.getUnassignedChildSplits(finishedSplitIds);
+ assignSplits(splitsAvailableForAssignment);
+ }
+
+ private void assignSplits(List<DynamoDbStreamsShardSplit>
splitsAvailableForAssignment) {
Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments =
new HashMap<>();
- for (DynamoDbStreamsShardSplit split :
splitTracker.splitsAvailableForAssignment()) {
+ for (DynamoDbStreamsShardSplit split : splitsAvailableForAssignment) {
assignSplitToSubtask(split, newSplitAssignments);
}
updateSplitAssignment(newSplitAssignments);
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
index 2269ce6..73885dd 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
@@ -28,6 +28,8 @@ import
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSpli
import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import java.util.Collection;
@@ -50,10 +52,12 @@ import static
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignm
@Internal
public class SplitTracker {
private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new
ConcurrentHashMap<>();
+ private final Map<String, Set<String>> parentChildSplitMap = new
ConcurrentHashMap<>();
private final Set<String> assignedSplits = new HashSet<>();
private final Set<String> finishedSplits = new HashSet<>();
private final String streamArn;
private final InitialPosition initialPosition;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitTracker.class);
public SplitTracker(String streamArn, InitialPosition initialPosition) {
this(Collections.emptyList(), streamArn, initialPosition);
@@ -69,7 +73,7 @@ public class SplitTracker {
splitWithStatus -> {
DynamoDbStreamsShardSplit currentSplit =
splitWithStatus.split();
knownSplits.put(currentSplit.splitId(), currentSplit);
-
+ addSplitToMapping(currentSplit);
if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
assignedSplits.add(splitWithStatus.split().splitId());
}
@@ -93,10 +97,20 @@ public class SplitTracker {
DynamoDbStreamsShardSplit newSplit =
mapToSplit(shard, getStartingPosition(shard,
discoveredShardIds));
knownSplits.put(shardId, newSplit);
+ addSplitToMapping(newSplit);
}
}
}
+ private void addSplitToMapping(DynamoDbStreamsShardSplit split) {
+ if (split.getParentShardId() == null) {
+ return;
+ }
+ parentChildSplitMap
+ .computeIfAbsent(split.getParentShardId(), k -> new
HashSet<>())
+ .add(split.splitId());
+ }
+
/**
* If there is no parent in knownSplits of the current shard, that means
that the parent has
* been read already, so we start the current shard with the specified
initial position. We do
@@ -159,24 +173,63 @@ public class SplitTracker {
return assignedSplits.contains(splitId);
}
+ /**
+ * Function to get children splits available for given parent ids. This
will ensure not to
+ * iterate all the values in knownSplits so saving compute
+ */
+ public List<DynamoDbStreamsShardSplit>
getUnassignedChildSplits(Set<String> parentSplitIds) {
+ return parentSplitIds
+ .parallelStream()
+ .filter(
+ splitId -> {
+ if (!parentChildSplitMap.containsKey(splitId)) {
+ LOG.warn(
+ "splitId: {} is not present in
parent-child relationship map. "
+ + "This indicates that there
might be some data loss in the application",
+ splitId);
+ }
+ return parentChildSplitMap.containsKey(splitId);
+ })
+ .map(parentChildSplitMap::get)
+ .flatMap(Set::stream)
+ .filter(knownSplits::containsKey)
+ .map(knownSplits::get)
+ .filter(this::checkIfSplitCanBeAssigned)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Tells whether a split can be assigned or not. Conditions which it
checks:
+ *
+ * <p>- Split should not already be assigned.
+ *
+ * <p>- Split should not be already finished.
+ *
+ * <p>- The parent splits should either be finished or no longer be
present in knownSplits.
+ */
+ private boolean checkIfSplitCanBeAssigned(DynamoDbStreamsShardSplit split)
{
+ boolean splitIsNotAssigned = !isAssigned(split.splitId());
+ return splitIsNotAssigned
+ && !isFinished(split.splitId())
+ && verifyParentIsEitherFinishedOrCleanedUp(split);
+ }
+
/**
* Since we never put an inconsistent shard lineage to splitTracker, so if
a shard's parent is
* not there, that means that that should already be cleaned up.
*/
public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
- return knownSplits.values().stream()
- .filter(
- split -> {
- boolean splitIsNotAssigned =
!isAssigned(split.splitId());
- return splitIsNotAssigned
- && !isFinished(split.splitId())
- &&
verifyParentIsEitherFinishedOrCleanedUp(split);
- })
+ return knownSplits
+ .values()
+ .parallelStream()
+ .filter(this::checkIfSplitCanBeAssigned)
.collect(Collectors.toList());
}
public List<DynamoDBStreamsShardSplitWithAssignmentStatus>
snapshotState(long checkpointId) {
- return knownSplits.values().stream()
+ return knownSplits
+ .values()
+ .parallelStream()
.map(
split -> {
SplitAssignmentStatus assignmentStatus =
@@ -209,6 +262,7 @@ public class SplitTracker {
if (isSplitReadyToBeCleanedUp(finishedSplit, discoveredSplitIds)) {
finishedSplits.remove(finishedSplitId);
knownSplits.remove(finishedSplitId);
+ parentChildSplitMap.remove(finishedSplit.splitId());
}
}
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
index 599bb2c..dc614a2 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
@@ -228,6 +228,91 @@ class SplitTrackerTest {
assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splitsWithoutParents);
}
+ @Test
+ public void
testOrderedUnassignedChildSplitsParentsFinishedAvailableForAssignment() {
+ List<Shard> finishedParentShards =
+ Arrays.asList(
+ generateShard(1, "1000", "3000", null),
+ generateShard(2, "2000", "4000", null),
+ generateShard(3, "3000", "5000", null));
+ List<Shard> shardsWithParents =
+ Arrays.asList(
+ generateShard(4, "4000", null, generateShardId(1)),
+ generateShard(5, "5000", null, generateShardId(1)),
+ generateShard(6, "6000", null, generateShardId(2)),
+ generateShard(7, "7000", null, generateShardId(3)));
+ List<Shard> shardsWithoutParents =
+ Arrays.asList(
+ generateShard(8, "8000", null, null), generateShard(9,
"9000", null, null));
+
+ SplitTracker splitTracker = new SplitTracker(STREAM_ARN,
InitialPosition.TRIM_HORIZON);
+ splitTracker.addSplits(
+ Stream.of(finishedParentShards, shardsWithParents,
shardsWithParents)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+ splitTracker.markAsFinished(
+
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));
+
+ List<DynamoDbStreamsShardSplit> unassignedChildSplits =
+ splitTracker.getUnassignedChildSplits(
+ finishedParentShards.stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
+
+ assertThat(unassignedChildSplits)
+ .containsExactlyInAnyOrderElementsOf(
+ shardsWithParents.stream()
+ .map(this::getSplitFromShard)
+ .collect(Collectors.toList()));
+ assertThat(unassignedChildSplits)
+ .doesNotContainAnyElementsOf(
+ shardsWithoutParents.stream()
+ .map(this::getSplitFromShard)
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void
testOrderedUnassignedChildSplitsWithUnknownParentsFinishedAvailableForAssignment()
{
+ List<Shard> finishedParentShards =
+ Arrays.asList(
+ generateShard(1, "1000", "3000", null),
+ generateShard(2, "2000", "4000", null),
+ generateShard(3, "3000", "5000", null));
+ List<Shard> shardsWithParents =
+ Arrays.asList(
+ generateShard(4, "4000", null, generateShardId(1)),
+ generateShard(5, "5000", null, generateShardId(1)),
+ generateShard(6, "6000", null, generateShardId(2)));
+ List<Shard> shardsWithoutParents =
+ Arrays.asList(
+ generateShard(8, "8000", null, null), generateShard(9,
"9000", null, null));
+
+ SplitTracker splitTracker = new SplitTracker(STREAM_ARN,
InitialPosition.TRIM_HORIZON);
+ splitTracker.addSplits(
+ Stream.of(finishedParentShards, shardsWithParents,
shardsWithParents)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+ splitTracker.markAsFinished(
+
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));
+
+ List<DynamoDbStreamsShardSplit> unassignedChildSplits =
+ splitTracker.getUnassignedChildSplits(
+ finishedParentShards.stream()
+ .map(Shard::shardId)
+ .collect(Collectors.toSet()));
+
+ assertThat(unassignedChildSplits)
+ .containsExactlyInAnyOrderElementsOf(
+ shardsWithParents.stream()
+ .map(this::getSplitFromShard)
+ .collect(Collectors.toList()));
+ assertThat(unassignedChildSplits)
+ .doesNotContainAnyElementsOf(
+ shardsWithoutParents.stream()
+ .map(this::getSplitFromShard)
+ .collect(Collectors.toList()));
+ }
+
@Test
public void
testOrderedMarkingParentSplitAsFinishedMakesChildrenAvailableForAssignment() {
List<Shard> shards =