pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r679975199



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
         public void run() {
             try {
                 mainOperator.run(lock, operatorChain);
-                if (!wasStoppedExternally && !isCanceled()) {
-                    synchronized (lock) {
-                        operatorChain.setIgnoreEndOfInput(false);
-                    }
-                }
+                completeProcessing();
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
-                completionFuture.completeExceptionally(t);
+                if (isCanceled()
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    completionFuture.completeExceptionally(new 
CancelTaskException(t));
+                } else if (finishingReason == 
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    // if we are stopping the source thread for 
stop-with-savepoint
+                    // we may actually return from run with an 
InterruptedException which
+                    // should be ignored (e.g. Kinesis case see FLINK-23528)
+                    try {
+                        // clear the interrupted status for the thread
+                        Thread.interrupted();
+                        completeProcessing();
+                        completionFuture.complete(null);
+                    } catch (Throwable e) {
+                        completionFuture.completeExceptionally(e);
+                    }
+                } else if (finishingReason == 
FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN) {
+                    // swallow all exceptions if the source was stopped 
without drain
+                    completionFuture.complete(null);
+                } else {
+                    completionFuture.completeExceptionally(t);
+                }
+            }
+        }
+
+        private void completeProcessing() throws InterruptedException, 
ExecutionException {
+            if (finishingReason.shouldCallFinish() && !isCanceled() && 
!isFailing()) {

Review comment:
       Can we unify `finishingReason` with `isCanceled()` or `isFailing()`? Or 
you would like to avoid doing this in this PR?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
         public void run() {
             try {
                 mainOperator.run(lock, operatorChain);
-                if (!wasStoppedExternally && !isCanceled()) {
-                    synchronized (lock) {
-                        operatorChain.setIgnoreEndOfInput(false);
-                    }
-                }
+                completeProcessing();
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
-                completionFuture.completeExceptionally(t);
+                if (isCanceled()
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    completionFuture.completeExceptionally(new 
CancelTaskException(t));
+                } else if (finishingReason == 
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    // if we are stopping the source thread for 
stop-with-savepoint
+                    // we may actually return from run with an 
InterruptedException which
+                    // should be ignored (e.g. Kinesis case see FLINK-23528)
+                    try {
+                        // clear the interrupted status for the thread
+                        Thread.interrupted();
+                        completeProcessing();
+                        completionFuture.complete(null);
+                    } catch (Throwable e) {
+                        completionFuture.completeExceptionally(e);
+                    }
+                } else if (finishingReason == 
FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN) {
+                    // swallow all exceptions if the source was stopped 
without drain
+                    completionFuture.complete(null);
+                } else {
+                    completionFuture.completeExceptionally(t);
+                }
+            }
+        }
+
+        private void completeProcessing() throws InterruptedException, 
ExecutionException {
+            if (finishingReason.shouldCallFinish() && !isCanceled() && 
!isFailing()) {
+                CompletableFuture<Void> endOfDataConsumed = new 
CompletableFuture<>();
+                mainMailboxExecutor.execute(
+                        () -> {
+                            // theoretically the StreamSource can implement 
BoundedOneInput, so we
+                            // need to call it here
+                            operatorChain.endInput(1);
+                            endData();
+                            endOfDataConsumed.complete(null);
+                        },
+                        "SourceStreamTask finished processing data.");
+                endOfDataConsumed.get();

Review comment:
       `mainMailboxExecutor.submit().get()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,15 +320,50 @@ protected void declineCheckpoint(long checkpointId) {
         public void run() {
             try {
                 mainOperator.run(lock, operatorChain);
-                if (!wasStoppedExternally && !isCanceled()) {
-                    synchronized (lock) {
-                        operatorChain.setIgnoreEndOfInput(false);
-                    }
-                }
+                completeProcessing();
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
-                completionFuture.completeExceptionally(t);
+                if (isCanceled()
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    completionFuture.completeExceptionally(new 
CancelTaskException(t));
+                } else if (finishingReason == 
FinishingReason.STOP_WITH_SAVEPOINT_DRAIN
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    // if we are stopping the source thread for 
stop-with-savepoint
+                    // we may actually return from run with an 
InterruptedException which
+                    // should be ignored (e.g. Kinesis case see FLINK-23528)
+                    try {
+                        // clear the interrupted status for the thread
+                        Thread.interrupted();
+                        completeProcessing();
+                        completionFuture.complete(null);
+                    } catch (Throwable e) {
+                        completionFuture.completeExceptionally(e);
+                    }

Review comment:
       Do we need to support handle this case here? Can not we just ignore it 
and treat it as bug?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -261,6 +277,29 @@ private void interruptSourceThread(boolean interrupt) {
         }
     }
 
+    private CompletableFuture<Boolean> triggerStopWithSavepointWithDrain(

Review comment:
       nit: `triggerStopWithSavepointWithDrainAsync`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -472,40 +486,60 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
                         new 
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
     }
 
-    private void resetSynchronousSavepointId(long id, boolean succeeded) {
-        if (!succeeded && activeSyncSavepointId != null && 
activeSyncSavepointId == id) {
-            // allow to process further EndOfPartition events
-            activeSyncSavepointId = null;
-            operatorChain.setIgnoreEndOfInput(false);
+    protected void endData() throws Exception {
+        advanceToEndOfEventTime();
+        // finish all operators in a chain effect way
+        operatorChain.finishOperators(actionExecutor);
+
+        for (ResultPartitionWriter partitionWriter : 
getEnvironment().getAllWriters()) {
+            partitionWriter.notifyEndOfData();
         }
-        syncSavepointId = null;
+
+        this.endOfDataReceived = true;
     }

Review comment:
       You are calling asynchronously enqueuing `endData()` in a couple of 
different places. I have a feeling that if there is a race condition between 
executing mailbox action and (source?) operator finishing on it's own it will 
result in calling `endData()` twice and that this method doesn't handle it well?
   
   I think I also do not like that we are calling `endInput()` in many places. 
Why don't we have a common code path for calling `endInput()` when task 
finishes, and let's just have a different ways how task can finish? In this 
regard previous behaviour in `afterInvoke()` I think was a better one.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
         this.endOfDataReceived = true;
     }
 
-    private void resetSynchronousSavepointId(long id, boolean succeeded) {
-        if (!succeeded && activeSyncSavepointId != null && 
activeSyncSavepointId == id) {
-            // allow to process further EndOfPartition events
-            activeSyncSavepointId = null;
-            operatorChain.setIgnoreEndOfInput(false);
-        }
-        syncSavepointId = null;
-    }
-
-    private void setSynchronousSavepointId(long checkpointId, boolean 
ignoreEndOfInput) {
+    protected void setSynchronousSavepoint(long checkpointId, boolean isDrain) 
{
         checkState(
-                syncSavepointId == null,
+                syncSavepointWithoutDrain == null
+                        && (syncSavepointWithDrain == null
+                                || (isDrain && syncSavepointWithDrain == 
checkpointId)),
                 "at most one stop-with-savepoint checkpoint at a time is 
allowed");
-        syncSavepointId = checkpointId;
-        activeSyncSavepointId = checkpointId;
-        operatorChain.setIgnoreEndOfInput(ignoreEndOfInput);
+        if (isDrain) {
+            if (syncSavepointWithDrain == null) {
+                syncSavepointWithDrain = checkpointId;
+                savepointCompletedFuture = new CompletableFuture<>();
+            }
+        } else {
+            syncSavepointWithoutDrain = checkpointId;
+        }
     }
 
     @VisibleForTesting
     OptionalLong getSynchronousSavepointId() {
-        return syncSavepointId != null ? OptionalLong.of(syncSavepointId) : 
OptionalLong.empty();
+        if (syncSavepointWithoutDrain != null) {
+            return OptionalLong.of(syncSavepointWithoutDrain);
+        } else if (syncSavepointWithDrain != null) {
+            return OptionalLong.of(syncSavepointWithDrain);
+        } else {
+            return OptionalLong.empty();
+        }
+    }
+
+    private boolean isCurrentSavepointWithDrain(long checkpointId) {
+        return syncSavepointWithDrain != null && syncSavepointWithDrain == 
checkpointId;

Review comment:
       As you prefer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to