hlteoh37 commented on code in PR #164:
URL: 
https://github.com/apache/flink-connector-aws/pull/164#discussion_r1756874586


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -86,17 +87,27 @@ public SplitTracker(
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
         Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+                
shardsToAdd.parallelStream().map(Shard::shardId).collect(Collectors.toSet());

Review Comment:
   nit: Using `parallelStream` for this is a little bit much 🤣 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -201,8 +202,21 @@ private SplitGraphInconsistencyTracker 
trackSplitsAndResolveInconsistencies(
     }
 
     private void assignSplits() {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.splitsAvailableForAssignment();
+        assignSplitsInternal(splitsAvailableForAssignment);
+    }
+
+    private void assignSplits(Set<String> finishedSplitIds) {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.splitsAvailableForAssignment(finishedSplitIds);
+        assignSplitsInternal(splitsAvailableForAssignment);
+    }

Review Comment:
   can we name these methods something like `assignChildSplits()` and 
`assignAllAvailableSplits()`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -159,12 +170,35 @@ public boolean isAssigned(String splitId) {
         return assignedSplits.contains(splitId);
     }
 
+    /**
+     * Function to get children splits available for given parent ids. This 
will ensure not to
+     * iterate all the values in knownSplits so saving compute
+     */
+    public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment(
+            Set<String> parentSplitIds) {
+        return parentSplitIds
+                .parallelStream()
+                .filter(parentChildSplitMap::containsKey)

Review Comment:
   can we log a warn here ?
   
   If the `parentChildSplitMap` doesn't have the key, this indicates there has 
been some issue and potential loss of data



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -201,8 +202,21 @@ private SplitGraphInconsistencyTracker 
trackSplitsAndResolveInconsistencies(
     }
 
     private void assignSplits() {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.splitsAvailableForAssignment();
+        assignSplitsInternal(splitsAvailableForAssignment);
+    }
+
+    private void assignSplits(Set<String> finishedSplitIds) {
+        List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
+                splitTracker.splitsAvailableForAssignment(finishedSplitIds);
+        assignSplitsInternal(splitsAvailableForAssignment);
+    }

Review Comment:
   `assignSplitsInternal` can then be `assignSplits`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -159,12 +170,35 @@ public boolean isAssigned(String splitId) {
         return assignedSplits.contains(splitId);
     }
 
+    /**
+     * Function to get children splits available for given parent ids. This 
will ensure not to
+     * iterate all the values in knownSplits so saving compute
+     */
+    public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment(

Review Comment:
   nit: can we name something meaningful like `getUnassignedChildSplits`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -159,12 +170,35 @@ public boolean isAssigned(String splitId) {
         return assignedSplits.contains(splitId);
     }
 
+    /**
+     * Function to get children splits available for given parent ids. This 
will ensure not to
+     * iterate all the values in knownSplits so saving compute
+     */
+    public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment(
+            Set<String> parentSplitIds) {
+        return parentSplitIds
+                .parallelStream()
+                .filter(parentChildSplitMap::containsKey)
+                .filter(
+                        splitId -> {
+                            boolean splitIsNotAssigned = !isAssigned(splitId);
+                            DynamoDbStreamsShardSplit split = 
knownSplits.get(splitId);
+                            return splitIsNotAssigned
+                                    && !isFinished(splitId)
+                                    && 
verifyParentIsEitherFinishedOrCleanedUp(split);

Review Comment:
   can we extract this to a private method? Prevent code duplication for this 
essential logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to