pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r678949633



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -287,13 +292,13 @@ public DataInputStatus emitNext(DataOutput<OUT> output) 
throws Exception {
 
         // short circuit the common case (every invocation except the first)
         if (currentMainOutput != null) {
-            return 
convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
+            return 
convertToInternalStatus(readingReader.pollNext(currentMainOutput));

Review comment:
       I'm a bit confused by this `readingReader` concept co-existing with the 
`sourceReader`. 
   
   1.  ​I think if anything either it would be better to wrap the existing 
reader `sourceReader = new DrainedSourceReader(sourceReader)` and 
proxy/overwrite the relevant calls, but...
   2. I think it would seem simpler/cleaner to me, to just return here `return 
END_OF_INPUT;` directly, and drop the `DrainedSourceReader` class
   
   2. Has the additional benefit of avoiding presence of another `SourceReader` 
instance in the class loader. With just one or two instances JVM is able to 
devirtualise and inline `pollNext()` calls, so I would avoid adding 
`DrainedSourceReader` just for the sake of this argument.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -114,6 +114,8 @@
     /** The source reader that does most of the work. */
     private SourceReader<OUT, SplitT> sourceReader;
 
+    private SourceReader<OUT, SplitT> readingReader;

Review comment:
       Missing close in `close()`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -267,6 +270,8 @@ public void finish() throws Exception {
         if (eventTimeLogic != null) {
             eventTimeLogic.stopPeriodicWatermarkEmits();
         }
+        dataFinished = true;

Review comment:
       nit: shouldn't this belong to the previous commit?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -68,6 +68,12 @@
      */
     private volatile boolean wasStoppedExternally = false;
 
+    /**
+     * Indicates whether the source operator has been cancelled for 
stop-with-savepoint --drain, in
+     * this case we want to ignore interrupt exceptions thrown when stopping.
+     */
+    private volatile boolean wasDrained = false;
+

Review comment:
       what's the difference between `wasStoppedExternally` and `wasDrained`? 
According to javadoc first one is for `stop-with-savepoint` with or without 
drain, while the latter is only for the drain?
   
   If you really need to add another flag, it's getting a bit too complicated 
for my taste with all of those flags: `canceled`, `failing`, 
`wasStoppedExternally`, `wasDrained`, `isRunning`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -119,9 +113,13 @@ protected void finishTask() throws Exception {
     }
 
     @Override
-    public Future<Boolean> triggerCheckpointAsync(
+    public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!isExternallyInducedSource) {
+            if (checkpointOptions.getCheckpointType().shouldDrain()) {
+                setSynchronousSavepoint(checkpointMetaData.getCheckpointId(), 
true);
+                mainMailboxExecutor.execute(this::endData, "Drain pipeline on 
stop-with-savepoint");
+            }
             return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);

Review comment:
       Is it ok that `endData()` and `triggerCheckpointAsyncXXX()` will be two 
different mailbox actions? With a potential that something happens in between 
them? 
   
   Even if that's ok for now, it would be somehow safer for the future to avoid 
such race conditions. Is there an easy way to execute both things inside single 
mailbox action?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
##########
@@ -120,9 +121,8 @@ void setNext(StreamOperatorWrapper next) {
      * MailboxExecutor#yield()} to take the mails of closing operator and 
running timers and run
      * them.
      */
-    public void finish(StreamTaskActionExecutor actionExecutor, boolean 
isStoppingBySyncSavepoint)
-            throws Exception {
-        if (!isHead && !isStoppingBySyncSavepoint) {
+    public void finish(StreamTaskActionExecutor actionExecutor) throws 
Exception {
+        if (!isHead || wrapped instanceof StreamSource) {

Review comment:
       `StreamSource` getting `endInput()` call? Do we need that?
   
   If yes, can we place this logic inside `SourceStreamTask` maybe? It will be 
easier to clean up the code once we drop legacy sources.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1277,10 +1291,14 @@ public final ExecutorService 
getAsyncOperationsThreadPool() {
     private void notifyCheckpointComplete(long checkpointId) throws Exception {
         subtaskCheckpointCoordinator.notifyCheckpointComplete(
                 checkpointId, operatorChain, this::isRunning);
-        if (isRunning && isSynchronousSavepointId(checkpointId)) {
-            finishTask();
-            // Reset to "notify" the internal synchronous savepoint mailbox 
loop.
-            resetSynchronousSavepointId(checkpointId, true);
+        if (isRunning) {
+            if (isCurrentSavepointWithoutDrain(checkpointId)) {
+                finishTask();
+                // Reset to "notify" the internal synchronous savepoint 
mailbox loop.
+                syncSavepointWithoutDrain = null;
+            } else if (isCurrentSavepointWithDrain(checkpointId)) {
+                savepointCompletedFuture.complete(null);
+            }

Review comment:
       Do we have a ticket for unifying this behaviour as per offline 
discussion? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
     // ------------------------------------------------------------------------
 
     @Override
-    public Future<Boolean> triggerCheckpointAsync(
+    public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!externallyInducedCheckpoints) {
-            return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
+            if (checkpointOptions.getCheckpointType().shouldDrain()) {
+                mainMailboxExecutor.execute(
+                        () -> {
+                            
setSynchronousSavepoint(checkpointMetaData.getCheckpointId(), true);
+                            wasDrained = true;
+                            if (mainOperator != null) {
+                                mainOperator.stop();
+                            }
+                        },
+                        "stop legacy source and set synchronous savepoint");
+                return sourceThread
+                        .getCompletionFuture()
+                        .thenCompose(
+                                ignore ->
+                                        super.triggerCheckpointAsync(
+                                                checkpointMetaData, 
checkpointOptions));

Review comment:
       What if the source doesn't complete? I think it would behave a little 
bit differently compared to right now?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
         this.endOfDataReceived = true;
     }
 
-    private void resetSynchronousSavepointId(long id, boolean succeeded) {
-        if (!succeeded && activeSyncSavepointId != null && 
activeSyncSavepointId == id) {
-            // allow to process further EndOfPartition events
-            activeSyncSavepointId = null;
-            operatorChain.setIgnoreEndOfInput(false);
-        }
-        syncSavepointId = null;
-    }
-
-    private void setSynchronousSavepointId(long checkpointId, boolean 
ignoreEndOfInput) {
+    protected void setSynchronousSavepoint(long checkpointId, boolean isDrain) 
{
         checkState(
-                syncSavepointId == null,
+                syncSavepointWithoutDrain == null
+                        && (syncSavepointWithDrain == null
+                                || (isDrain && syncSavepointWithDrain == 
checkpointId)),
                 "at most one stop-with-savepoint checkpoint at a time is 
allowed");
-        syncSavepointId = checkpointId;
-        activeSyncSavepointId = checkpointId;
-        operatorChain.setIgnoreEndOfInput(ignoreEndOfInput);
+        if (isDrain) {
+            if (syncSavepointWithDrain == null) {
+                syncSavepointWithDrain = checkpointId;
+                savepointCompletedFuture = new CompletableFuture<>();
+            }
+        } else {
+            syncSavepointWithoutDrain = checkpointId;
+        }
     }
 
     @VisibleForTesting
     OptionalLong getSynchronousSavepointId() {
-        return syncSavepointId != null ? OptionalLong.of(syncSavepointId) : 
OptionalLong.empty();
+        if (syncSavepointWithoutDrain != null) {
+            return OptionalLong.of(syncSavepointWithoutDrain);
+        } else if (syncSavepointWithDrain != null) {
+            return OptionalLong.of(syncSavepointWithDrain);
+        } else {
+            return OptionalLong.empty();
+        }
+    }
+
+    private boolean isCurrentSavepointWithDrain(long checkpointId) {
+        return syncSavepointWithDrain != null && syncSavepointWithDrain == 
checkpointId;

Review comment:
       nit: do we need the null check in front? If so `Object.equals()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,25 +298,47 @@ protected void declineCheckpoint(long checkpointId) {
         public void run() {
             try {
                 mainOperator.run(lock, operatorChain);
-                if (!wasStoppedExternally && !isCanceled() && !isFailing()) {
-                    synchronized (lock) {
-                        operatorChain.setIgnoreEndOfInput(false);
-                    }
-                    CompletableFuture<Void> endOfDataProcessed = new 
CompletableFuture<>();
-                    mainMailboxExecutor.execute(
-                            () -> {
-                                endData();
-                                endOfDataProcessed.complete(null);
-                            },
-                            "SourceStreamTask finished processing data.");
-
-                    // wait until all operators are finished
-                    endOfDataProcessed.get();
-                }
+                completeProcessing();
                 completionFuture.complete(null);
             } catch (Throwable t) {
                 // Note, t can be also an InterruptedException
-                completionFuture.completeExceptionally(t);
+                if (isCanceled()
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    completionFuture.completeExceptionally(new 
CancelTaskException(t));
+                } else if (wasDrained
+                        && ExceptionUtils.findThrowable(t, 
InterruptedException.class)
+                                .isPresent()) {
+                    // if we are stopping the source thread for 
stop-with-savepoint
+                    // we may actually return from run with an 
InterruptedException which
+                    // should be ignored
+                    try {
+                        // clear the interrupted status for the thread
+                        Thread.interrupted();
+                        completeProcessing();
+                        completionFuture.complete(null);
+                    } catch (Throwable e) {
+                        completionFuture.completeExceptionally(e);
+                    }

Review comment:
       Why do we need this special treatment? Is this a workaround of [the 
kinesis bug](
   https://issues.apache.org/jira/browse/FLINK-23528)?
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -281,25 +298,47 @@ protected void declineCheckpoint(long checkpointId) {
         public void run() {
             try {
                 mainOperator.run(lock, operatorChain);
-                if (!wasStoppedExternally && !isCanceled() && !isFailing()) {
-                    synchronized (lock) {
-                        operatorChain.setIgnoreEndOfInput(false);
-                    }
-                    CompletableFuture<Void> endOfDataProcessed = new 
CompletableFuture<>();
-                    mainMailboxExecutor.execute(
-                            () -> {
-                                endData();
-                                endOfDataProcessed.complete(null);
-                            },
-                            "SourceStreamTask finished processing data.");
-
-                    // wait until all operators are finished
-                    endOfDataProcessed.get();
-                }

Review comment:
       Haven't you introduced this code earlier in this PR? If so, can you 
place it inside the `completeProcessing()` method from the beginning without 
this refactor in the last commit?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -251,8 +252,10 @@
     /** TODO it might be replaced by the global IO executor on TaskManager 
level future. */
     private final ExecutorService channelIOExecutor;
 
-    private Long syncSavepointId = null;
-    private Long activeSyncSavepointId = null;
+    private Long syncSavepointWithoutDrain = null;
+
+    private Long syncSavepointWithDrain = null;
+    private CompletableFuture<Void> savepointCompletedFuture = null;

Review comment:
       `syncSavepointWithDrainCompletionFuture`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
     // ------------------------------------------------------------------------
 
     @Override
-    public Future<Boolean> triggerCheckpointAsync(
+    public CompletableFuture<Boolean> triggerCheckpointAsync(

Review comment:
       nit: this method grew a bit too much and I would split it somehow. Maybe 
it's good enough to have mailbox action as named function instead of lambda.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -172,14 +178,7 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
                 .getCompletionFuture()
                 .whenComplete(
                         (Void ignore, Throwable sourceThreadThrowable) -> {
-                            if (isCanceled()
-                                    && ExceptionUtils.findThrowable(
-                                                    sourceThreadThrowable,
-                                                    InterruptedException.class)
-                                            .isPresent()) {
-                                mailboxProcessor.reportThrowable(
-                                        new 
CancelTaskException(sourceThreadThrowable));
-                            } else if (!wasStoppedExternally && 
sourceThreadThrowable != null) {

Review comment:
       What has happened with this code?
   
   edit: ok, I see you've moved it mostly. I think. So partially this is just a 
refactor? Can you extract refactor part as an independent commit?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -249,10 +248,28 @@ private void interruptSourceThread(boolean interrupt) {
     // ------------------------------------------------------------------------
 
     @Override
-    public Future<Boolean> triggerCheckpointAsync(
+    public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!externallyInducedCheckpoints) {
-            return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
+            if (checkpointOptions.getCheckpointType().shouldDrain()) {
+                mainMailboxExecutor.execute(
+                        () -> {
+                            
setSynchronousSavepoint(checkpointMetaData.getCheckpointId(), true);
+                            wasDrained = true;
+                            if (mainOperator != null) {
+                                mainOperator.stop();
+                            }
+                        },
+                        "stop legacy source and set synchronous savepoint");
+                return sourceThread
+                        .getCompletionFuture()
+                        .thenCompose(

Review comment:
       the change from `Future` to `CompletableFuture` in 
`triggerCheckpointAsync` is because of this compose?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/AbstractCollectResultBuffer.java
##########
@@ -128,6 +128,9 @@ private void addResults(CollectCoordinationResponse 
response, long responseOffse
         if (!results.isEmpty()) {
             // response contains some data, add them to buffer
             int addStart = (int) (offset - responseOffset);
+            if (addStart > results.size()) {
+                return;
+            }

Review comment:
       missing unit test?
   
   I'm not sure if I understand this fix, maybe unit test will help me 
understand?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -281,11 +277,4 @@ public void abortCheckpointOnBarrier(long checkpointId, 
CheckpointException caus
         }
         super.abortCheckpointOnBarrier(checkpointId, cause);
     }
-
-    @Override
-    protected void advanceToEndOfEventTime() throws Exception {
-        for (Output<StreamRecord<?>> sourceOutput : 
operatorChain.getChainedSourceOutputs()) {
-            sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
-        }
-    }

Review comment:
       ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -148,8 +149,8 @@
  *       +----> initialize-operator-states()
  *       +----> open-operators()
  *       +----> run()
+ *       +----> finish-operators()
  *       +----> close-operators()
- *       +----> dispose-operators()

Review comment:
       nitty nit: should have been a separate hotfix?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
         this.endOfDataReceived = true;
     }
 
-    private void resetSynchronousSavepointId(long id, boolean succeeded) {
-        if (!succeeded && activeSyncSavepointId != null && 
activeSyncSavepointId == id) {
-            // allow to process further EndOfPartition events
-            activeSyncSavepointId = null;
-            operatorChain.setIgnoreEndOfInput(false);
-        }
-        syncSavepointId = null;
-    }
-
-    private void setSynchronousSavepointId(long checkpointId, boolean 
ignoreEndOfInput) {
+    protected void setSynchronousSavepoint(long checkpointId, boolean isDrain) 
{
         checkState(
-                syncSavepointId == null,
+                syncSavepointWithoutDrain == null
+                        && (syncSavepointWithDrain == null
+                                || (isDrain && syncSavepointWithDrain == 
checkpointId)),

Review comment:
       why this `|| (isDrain && syncSavepointWithDrain == checkpointId)),` part?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to