This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 29f9918  [FLINK-18074][checkpoint] Ensure task could fail when 
exception thrown out on notified of checkpoint completed/aborted
29f9918 is described below

commit 29f99180464e4dd1aea9e7b28a2616dda53adb04
Author: Yun Tang <[email protected]>
AuthorDate: Wed Jun 3 23:49:45 2020 +0800

    [FLINK-18074][checkpoint] Ensure task could fail when exception thrown out 
on notified of checkpoint completed/aborted
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 38 ++++++++++++-----
 .../streaming/runtime/tasks/StreamTaskTest.java    | 48 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7ebb9e5..d9add56 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -76,6 +76,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
@@ -923,9 +924,33 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        @Override
        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
-               return 
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
-                               () -> notifyCheckpointComplete(checkpointId),
-                               "checkpoint %d complete", checkpointId);
+               return notifyCheckpointOperation(
+                       () -> notifyCheckpointComplete(checkpointId),
+                       String.format("checkpoint %d complete", checkpointId));
+       }
+
+       @Override
+       public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+               return notifyCheckpointOperation(
+                       () -> 
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, this::isRunning),
+                       String.format("checkpoint %d aborted", checkpointId));
+       }
+
+       private Future<Void> notifyCheckpointOperation(RunnableWithException 
runnable, String description) {
+               CompletableFuture<Void> result = new CompletableFuture<>();
+               
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(
+                       () -> {
+                               try {
+                                       runnable.run();
+                               }
+                               catch (Exception ex) {
+                                       result.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                               result.complete(null);
+                       },
+                       description);
+               return result;
        }
 
        private void notifyCheckpointComplete(long checkpointId) throws 
Exception {
@@ -937,13 +962,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
-       @Override
-       public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
-               return 
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
-                       () -> 
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, this::isRunning),
-                       "checkpoint %d aborted", checkpointId);
-       }
-
        private void tryShutdownTimerService() {
 
                if (!timerService.isTerminated()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e46dc0b..023f801 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -148,6 +148,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
@@ -969,6 +970,34 @@ public class StreamTaskTest extends TestLogger {
                assertEquals(true, operator.closed.get());
        }
 
+       @Test
+       public void testFailToConfirmCheckpointCompleted() throws Exception {
+               testFailToConfirmCheckpointMessage(streamTask -> 
streamTask.notifyCheckpointCompleteAsync(1L));
+       }
+
+       @Test
+       public void testFailToConfirmCheckpointAborted() throws Exception {
+               testFailToConfirmCheckpointMessage(streamTask -> 
streamTask.notifyCheckpointAbortAsync(1L));
+       }
+
+       private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, 
?>> consumer) throws Exception {
+               FailOnNotifyCheckpointOperator<Integer> operator = new 
FailOnNotifyCheckpointOperator<>();
+               MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+                       new 
MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                               .addInput(BasicTypeInfo.INT_TYPE_INFO);
+               StreamTaskMailboxTestHarness<Integer> harness = builder
+                       .setupOutputForSingletonOperatorChain(operator)
+                       .build();
+
+               try {
+                       consumer.accept(harness.streamTask);
+                       harness.streamTask.runMailboxStep();
+                       fail();
+               } catch (ExpectedTestException expected) {
+                       // expected exception
+               }
+       }
+
        /**
         * Tests that checkpoints are declined if operators are (partially) 
closed.
         *
@@ -2017,4 +2046,23 @@ public class StreamTaskTest extends TestLogger {
                public void processElement(StreamRecord<T> element) throws 
Exception {
                }
        }
+
+       private static class FailOnNotifyCheckpointOperator<T> extends 
AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       super.notifyCheckpointComplete(checkpointId);
+                       throw new ExpectedTestException();
+               }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) throws 
Exception {
+                       super.notifyCheckpointAborted(checkpointId);
+                       throw new ExpectedTestException();
+               }
+
+               @Override
+               public void processElement(StreamRecord<T> element) throws 
Exception {
+
+               }
+       }
 }

Reply via email to