This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b0d983  [FLINK-36270][Connectors/DynamoDB] Assign only children 
splits of a split when it is marked as finished
9b0d983 is described below

commit 9b0d98343a892be64d25d26064108aa27a255bd0
Author: gguptp <[email protected]>
AuthorDate: Thu Sep 12 17:40:26 2024 +0530

    [FLINK-36270][Connectors/DynamoDB] Assign only children splits of a split 
when it is marked as finished
---
 .../DynamoDbStreamsSourceEnumerator.java           | 21 +++++-
 .../source/enumerator/tracker/SplitTracker.java    | 74 ++++++++++++++++---
 .../enumerator/tracker/SplitTrackerTest.java       | 85 ++++++++++++++++++++++
 3 files changed, 166 insertions(+), 14 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
index e62c0f3..8a8f29c 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
@@ -128,6 +128,7 @@ public class DynamoDbStreamsSourceEnumerator
         }
     }
 
+    /** When we mark a split as finished, we will only assign its child splits 
to the subtasks. */
     private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
         splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
         splitAssignment
@@ -137,7 +138,7 @@ public class DynamoDbStreamsSourceEnumerator
                                 splitsFinishedEvent
                                         .getFinishedSplitIds()
                                         .contains(split.splitId()));
-        assignSplits();
+        assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
     }
 
     private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
@@ -161,7 +162,7 @@ public class DynamoDbStreamsSourceEnumerator
                     context.registeredReaders().size());
             return;
         }
-        assignSplits();
+        assignAllAvailableSplits();
     }
 
     /**
@@ -200,9 +201,21 @@ public class DynamoDbStreamsSourceEnumerator
         return splitGraphInconsistencyTracker;
     }
 
-    private void assignSplits() {
+    private void assignAllAvailableSplits() {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.splitsAvailableForAssignment();
+        assignSplits(splitsAvailableForAssignment);
+    }
+
+    private void assignChildSplits(Set<String> finishedSplitIds) {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.getUnassignedChildSplits(finishedSplitIds);
+        assignSplits(splitsAvailableForAssignment);
+    }
+
+    private void assignSplits(List<DynamoDbStreamsShardSplit> 
splitsAvailableForAssignment) {
         Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = 
new HashMap<>();
-        for (DynamoDbStreamsShardSplit split : 
splitTracker.splitsAvailableForAssignment()) {
+        for (DynamoDbStreamsShardSplit split : splitsAvailableForAssignment) {
             assignSplitToSubtask(split, newSplitAssignments);
         }
         updateSplitAssignment(newSplitAssignments);
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
index 2269ce6..73885dd 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSpli
 import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
 import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.Shard;
 
 import java.util.Collection;
@@ -50,10 +52,12 @@ import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignm
 @Internal
 public class SplitTracker {
     private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Map<String, Set<String>> parentChildSplitMap = new 
ConcurrentHashMap<>();
     private final Set<String> assignedSplits = new HashSet<>();
     private final Set<String> finishedSplits = new HashSet<>();
     private final String streamArn;
     private final InitialPosition initialPosition;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitTracker.class);
 
     public SplitTracker(String streamArn, InitialPosition initialPosition) {
         this(Collections.emptyList(), streamArn, initialPosition);
@@ -69,7 +73,7 @@ public class SplitTracker {
                 splitWithStatus -> {
                     DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
                     knownSplits.put(currentSplit.splitId(), currentSplit);
-
+                    addSplitToMapping(currentSplit);
                     if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
                         assignedSplits.add(splitWithStatus.split().splitId());
                     }
@@ -93,10 +97,20 @@ public class SplitTracker {
                 DynamoDbStreamsShardSplit newSplit =
                         mapToSplit(shard, getStartingPosition(shard, 
discoveredShardIds));
                 knownSplits.put(shardId, newSplit);
+                addSplitToMapping(newSplit);
             }
         }
     }
 
+    private void addSplitToMapping(DynamoDbStreamsShardSplit split) {
+        if (split.getParentShardId() == null) {
+            return;
+        }
+        parentChildSplitMap
+                .computeIfAbsent(split.getParentShardId(), k -> new 
HashSet<>())
+                .add(split.splitId());
+    }
+
     /**
      * If there is no parent in knownSplits of the current shard, that means 
that the parent has
      * been read already, so we start the current shard with the specified 
initial position. We do
@@ -159,24 +173,63 @@ public class SplitTracker {
         return assignedSplits.contains(splitId);
     }
 
+    /**
+     * Function to get children splits available for given parent ids. This 
will ensure not to
+     * iterate all the values in knownSplits so saving compute
+     */
+    public List<DynamoDbStreamsShardSplit> 
getUnassignedChildSplits(Set<String> parentSplitIds) {
+        return parentSplitIds
+                .parallelStream()
+                .filter(
+                        splitId -> {
+                            if (!parentChildSplitMap.containsKey(splitId)) {
+                                LOG.warn(
+                                        "splitId: {} is not present in 
parent-child relationship map. "
+                                                + "This indicates that there 
might be some data loss in the application",
+                                        splitId);
+                            }
+                            return parentChildSplitMap.containsKey(splitId);
+                        })
+                .map(parentChildSplitMap::get)
+                .flatMap(Set::stream)
+                .filter(knownSplits::containsKey)
+                .map(knownSplits::get)
+                .filter(this::checkIfSplitCanBeAssigned)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Tells whether a split can be assigned or not. Conditions which it 
checks:
+     *
+     * <p>- Split should not already be assigned.
+     *
+     * <p>- Split should not be already finished.
+     *
+     * <p>- The parent splits should either be finished or no longer be 
present in knownSplits.
+     */
+    private boolean checkIfSplitCanBeAssigned(DynamoDbStreamsShardSplit split) 
{
+        boolean splitIsNotAssigned = !isAssigned(split.splitId());
+        return splitIsNotAssigned
+                && !isFinished(split.splitId())
+                && verifyParentIsEitherFinishedOrCleanedUp(split);
+    }
+
     /**
      * Since we never put an inconsistent shard lineage to splitTracker, so if 
a shard's parent is
      * not there, that means that that should already be cleaned up.
      */
     public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
-        return knownSplits.values().stream()
-                .filter(
-                        split -> {
-                            boolean splitIsNotAssigned = 
!isAssigned(split.splitId());
-                            return splitIsNotAssigned
-                                    && !isFinished(split.splitId())
-                                    && 
verifyParentIsEitherFinishedOrCleanedUp(split);
-                        })
+        return knownSplits
+                .values()
+                .parallelStream()
+                .filter(this::checkIfSplitCanBeAssigned)
                 .collect(Collectors.toList());
     }
 
     public List<DynamoDBStreamsShardSplitWithAssignmentStatus> 
snapshotState(long checkpointId) {
-        return knownSplits.values().stream()
+        return knownSplits
+                .values()
+                .parallelStream()
                 .map(
                         split -> {
                             SplitAssignmentStatus assignmentStatus =
@@ -209,6 +262,7 @@ public class SplitTracker {
             if (isSplitReadyToBeCleanedUp(finishedSplit, discoveredSplitIds)) {
                 finishedSplits.remove(finishedSplitId);
                 knownSplits.remove(finishedSplitId);
+                parentChildSplitMap.remove(finishedSplit.splitId());
             }
         }
     }
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
index 599bb2c..dc614a2 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java
@@ -228,6 +228,91 @@ class SplitTrackerTest {
         
assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splitsWithoutParents);
     }
 
+    @Test
+    public void 
testOrderedUnassignedChildSplitsParentsFinishedAvailableForAssignment() {
+        List<Shard> finishedParentShards =
+                Arrays.asList(
+                        generateShard(1, "1000", "3000", null),
+                        generateShard(2, "2000", "4000", null),
+                        generateShard(3, "3000", "5000", null));
+        List<Shard> shardsWithParents =
+                Arrays.asList(
+                        generateShard(4, "4000", null, generateShardId(1)),
+                        generateShard(5, "5000", null, generateShardId(1)),
+                        generateShard(6, "6000", null, generateShardId(2)),
+                        generateShard(7, "7000", null, generateShardId(3)));
+        List<Shard> shardsWithoutParents =
+                Arrays.asList(
+                        generateShard(8, "8000", null, null), generateShard(9, 
"9000", null, null));
+
+        SplitTracker splitTracker = new SplitTracker(STREAM_ARN, 
InitialPosition.TRIM_HORIZON);
+        splitTracker.addSplits(
+                Stream.of(finishedParentShards, shardsWithParents, 
shardsWithParents)
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toList()));
+        splitTracker.markAsFinished(
+                
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));
+
+        List<DynamoDbStreamsShardSplit> unassignedChildSplits =
+                splitTracker.getUnassignedChildSplits(
+                        finishedParentShards.stream()
+                                .map(Shard::shardId)
+                                .collect(Collectors.toSet()));
+
+        assertThat(unassignedChildSplits)
+                .containsExactlyInAnyOrderElementsOf(
+                        shardsWithParents.stream()
+                                .map(this::getSplitFromShard)
+                                .collect(Collectors.toList()));
+        assertThat(unassignedChildSplits)
+                .doesNotContainAnyElementsOf(
+                        shardsWithoutParents.stream()
+                                .map(this::getSplitFromShard)
+                                .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void 
testOrderedUnassignedChildSplitsWithUnknownParentsFinishedAvailableForAssignment()
 {
+        List<Shard> finishedParentShards =
+                Arrays.asList(
+                        generateShard(1, "1000", "3000", null),
+                        generateShard(2, "2000", "4000", null),
+                        generateShard(3, "3000", "5000", null));
+        List<Shard> shardsWithParents =
+                Arrays.asList(
+                        generateShard(4, "4000", null, generateShardId(1)),
+                        generateShard(5, "5000", null, generateShardId(1)),
+                        generateShard(6, "6000", null, generateShardId(2)));
+        List<Shard> shardsWithoutParents =
+                Arrays.asList(
+                        generateShard(8, "8000", null, null), generateShard(9, 
"9000", null, null));
+
+        SplitTracker splitTracker = new SplitTracker(STREAM_ARN, 
InitialPosition.TRIM_HORIZON);
+        splitTracker.addSplits(
+                Stream.of(finishedParentShards, shardsWithParents, 
shardsWithParents)
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toList()));
+        splitTracker.markAsFinished(
+                
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));
+
+        List<DynamoDbStreamsShardSplit> unassignedChildSplits =
+                splitTracker.getUnassignedChildSplits(
+                        finishedParentShards.stream()
+                                .map(Shard::shardId)
+                                .collect(Collectors.toSet()));
+
+        assertThat(unassignedChildSplits)
+                .containsExactlyInAnyOrderElementsOf(
+                        shardsWithParents.stream()
+                                .map(this::getSplitFromShard)
+                                .collect(Collectors.toList()));
+        assertThat(unassignedChildSplits)
+                .doesNotContainAnyElementsOf(
+                        shardsWithoutParents.stream()
+                                .map(this::getSplitFromShard)
+                                .collect(Collectors.toList()));
+    }
+
     @Test
     public void 
testOrderedMarkingParentSplitAsFinishedMakesChildrenAvailableForAssignment() {
         List<Shard> shards =

Reply via email to