zhuzhurk commented on code in PR #21464:
URL: https://github.com/apache/flink/pull/21464#discussion_r1057075092


##########
flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java:
##########
@@ -120,7 +123,14 @@ public void assignSplits(SplitsAssignment<SplitT> 
newSplitAssignments) {
     }
 
     @Override
-    public void signalNoMoreSplits(int subtask) {}
+    public void signalNoMoreSplits(int subtask) {
+        subtaskHasNoMoreSplits[subtask] = true;
+    }
+
+    @Override
+    public void signalIntermediateNoMoreSplits(int subtask) {
+        subtaskHasNoMoreSplits[subtask] = false;

Review Comment:
   This method should be empty. The values should be false initially but not 
set to be false in this method.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java:
##########
@@ -157,7 +158,17 @@ private void sendSwitchSourceEvent(int subtaskId, int 
sourceIndex) {
                 LOG.debug("Restoring splits to subtask={} {}", subtaskId, 
splits);
                 context.assignSplits(
                         new 
SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
-                context.signalNoMoreSplits(subtaskId);
+                if (context instanceof SupportsIntermediateNoMoreSplits) {

Review Comment:
   It's better to have a method to be reused, e.g. 
   ```
   private static void signalNoMoreSplits(
       SplitEnumeratorContext<HybridSourceSplit> context, 
       subtaskId, 
       int sourceIndex, 
       int sourceSize);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -289,6 +290,20 @@ public void signalNoMoreSplits(int subtask) {
                 "Failed to send 'NoMoreSplits' to reader " + subtask);
     }
 
+    @Override
+    public void signalIntermediateNoMoreSplits(int subtask) {
+        checkSubtaskIndex(subtask);
+
+        // It's an intermediate noMoreSplit event, notify subtask to deal with 
this event.
+        callInCoordinatorThread(
+                () -> {
+                    subtaskHasNoMoreSplits[subtask] = false;

Review Comment:
   Why is it needed to be set to false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to