gguptp commented on code in PR #164:
URL: 
https://github.com/apache/flink-connector-aws/pull/164#discussion_r1758606097


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -159,24 +173,63 @@ public boolean isAssigned(String splitId) {
         return assignedSplits.contains(splitId);
     }
 
+    /**
+     * Function to get children splits available for given parent ids. This 
will ensure not to
+     * iterate all the values in knownSplits so saving compute
+     */
+    public List<DynamoDbStreamsShardSplit> 
getUnassignedChildSplits(Set<String> parentSplitIds) {
+        return parentSplitIds
+                .parallelStream()
+                .filter(
+                        splitId -> {
+                            if (!parentChildSplitMap.containsKey(splitId)) {
+                                LOG.warn(
+                                        "splitId: {} is not present in 
parent-child relationship map. "
+                                                + "This indicates that there 
might be some data loss in the application",
+                                        splitId);
+                            }
+                            return parentChildSplitMap.containsKey(splitId);
+                        })
+                .map(parentChildSplitMap::get)
+                .flatMap(Set::stream)
+                .filter(this::checkIfSplitCanBeAssigned)
+                .map(knownSplits::get)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Tells whether a split can be assigned or not. Conditions which it 
checks:
+     *
+     * <p>- Split should not already be assigned.
+     *
+     * <p>- Split should not be already finished.
+     *
+     * <p>- The parent splits should either be finished or no longer be 
present in knownSplits.
+     */
+    private boolean checkIfSplitCanBeAssigned(String splitId) {
+        boolean splitIsNotAssigned = !isAssigned(splitId);
+        DynamoDbStreamsShardSplit split = knownSplits.get(splitId);
+        return splitIsNotAssigned
+                && !isFinished(splitId)
+                && verifyParentIsEitherFinishedOrCleanedUp(split);
+    }

Review Comment:
   Updating



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to