This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b24aa0302074a5eaf6c1b47c2764e9883b674ab
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Fri Jun 21 18:07:03 2019 +0200

    [FLINK-12804] Introduce mailbox-based ExecutorService
---
 .../flink/state/api/output/BoundedStreamTask.java  |   3 +-
 .../flink/runtime/concurrent/FutureUtils.java      |  26 ++
 .../streaming/runtime/tasks/SourceStreamTask.java  |  66 ++---
 .../runtime/tasks/StreamIterationHead.java         |   3 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  94 ++-----
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  78 +++++-
 .../runtime/tasks/mailbox/MailboxImpl.java         | 195 ++++++++++++--
 .../runtime/tasks/mailbox/MailboxReceiver.java     |   6 +-
 .../runtime/tasks/mailbox/MailboxSender.java       |   6 +-
 .../{Mailbox.java => MailboxStateException.java}   |  29 +-
 .../DefaultActionContext.java}                     |  24 +-
 .../MailboxDefaultAction.java}                     |  19 +-
 .../tasks/mailbox/execution/MailboxExecutor.java   |  86 ++++++
 .../MailboxExecutorService.java}                   |  19 +-
 .../execution/MailboxExecutorServiceImpl.java      | 157 +++++++++++
 .../tasks/mailbox/execution/MailboxProcessor.java  | 295 +++++++++++++++++++++
 .../SuspendedMailboxDefaultAction.java}            |  16 +-
 .../tasks/StreamTaskSelectiveReadingTest.java      |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  13 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   5 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   3 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java     | 185 ++++++++++++-
 .../execution/MailboxExecutorServiceImplTest.java  | 178 +++++++++++++
 .../mailbox/execution/MailboxProcessorTest.java    | 265 ++++++++++++++++++
 .../mailbox/execution/TestMailboxExecutor.java     |  70 +++++
 .../flink/streaming/util/MockStreamTask.java       |   3 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   5 +-
 29 files changed, 1636 insertions(+), 222 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index db663da..2ce9626 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
@@ -76,7 +77,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
        }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
                if (input.hasNext()) {
                        reuse.replace(input.next());
                        headOperator.setKeyContextElement1(reuse);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index c1613c5..1db1d2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -28,9 +28,12 @@ import org.apache.flink.util.function.SupplierWithException;
 
 import akka.dispatch.OnComplete;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -995,4 +998,27 @@ public class FutureUtils {
                        }
                });
        }
+
+       /**
+        * Cancels all instances of {@link java.util.concurrent.Future} in the 
given list of runnables without interrupting.
+        * This method will suppress unexpected exceptions until the whole list 
is processed and then rethrow.
+        *
+        * @param runnables list of {@link Runnable} candidates to cancel.
+        */
+       public static void cancelRunnableFutures(@Nonnull List<Runnable> 
runnables) {
+               RuntimeException suppressedExceptions = null;
+               for (Runnable runnable : runnables) {
+                       if (runnable instanceof java.util.concurrent.Future) {
+                               try {
+                                       ((java.util.concurrent.Future<?>) 
runnable).cancel(false);
+                               } catch (RuntimeException ex) {
+                                       // safety net to ensure all candidates 
get cancelled before we let the exception bubble up.
+                                       suppressedExceptions = 
ExceptionUtils.firstOrSuppressed(ex, suppressedExceptions);
+                               }
+                       }
+               }
+               if (suppressedExceptions != null) {
+                       throw suppressedExceptions;
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 73f6bc4..50fdca1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -25,8 +25,11 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.FlinkException;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * {@link StreamTask} for executing a {@link StreamSource}.
  *
@@ -45,8 +48,6 @@ import org.apache.flink.util.FlinkException;
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends 
StreamSource<OUT, SRC>>
        extends StreamTask<OUT, OP> {
 
-       private static final Runnable SOURCE_POISON_LETTER = () -> {};
-
        private volatile boolean externallyInducedCheckpoints;
 
        public SourceStreamTask(Environment env) {
@@ -100,42 +101,21 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
        }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
+
+               context.suspendDefaultAction();
+
                // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
                // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
-               final LegacySourceFunctionThread sourceThread = new 
LegacySourceFunctionThread();
+               final LegacySourceFunctionThread sourceThread = new 
LegacySourceFunctionThread(getName());
                sourceThread.start();
-
-               // We run an alternative mailbox loop that does not involve 
default actions and synchronizes around actions.
-               try {
-                       runAlternativeMailboxLoop();
-               } catch (Exception mailboxEx) {
-                       // We cancel the source function if some runtime 
exception escaped the mailbox.
-                       if (!isCanceled()) {
-                               cancelTask();
-                       }
-                       throw mailboxEx;
-               }
-
-               sourceThread.join();
-               sourceThread.checkThrowSourceExecutionException();
-
-               context.allActionsCompleted();
-       }
-
-       private void runAlternativeMailboxLoop() throws InterruptedException {
-
-               while (true) {
-
-                       Runnable letter = mailbox.takeMail();
-                       if (letter == SOURCE_POISON_LETTER) {
-                               break;
+               sourceThread.getCompletionFuture().whenComplete((Void ignore, 
Throwable sourceThreadThrowable) -> {
+                       if (sourceThreadThrowable == null) {
+                               mailboxProcessor.allActionsCompleted();
+                       } else {
+                               
mailboxProcessor.reportThrowable(sourceThreadThrowable);
                        }
-
-                       synchronized (getCheckpointLock()) {
-                               letter.run();
-                       }
-               }
+               });
        }
 
        @Override
@@ -172,27 +152,25 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
         */
        private class LegacySourceFunctionThread extends Thread {
 
-               private Throwable sourceExecutionThrowable;
+               private final CompletableFuture<Void> completionFuture;
 
-               LegacySourceFunctionThread() {
-                       this.sourceExecutionThrowable = null;
+               LegacySourceFunctionThread(String taskDescription) {
+                       super("Legacy Source Thread - " + taskDescription);
+                       this.completionFuture = new CompletableFuture<>();
                }
 
                @Override
                public void run() {
                        try {
                                headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer(), operatorChain);
+                               completionFuture.complete(null);
                        } catch (Throwable t) {
-                               sourceExecutionThrowable = t;
-                       } finally {
-                               mailbox.clearAndPut(SOURCE_POISON_LETTER);
+                               completionFuture.completeExceptionally(t);
                        }
                }
 
-               void checkThrowSourceExecutionException() throws Exception {
-                       if (sourceExecutionThrowable != null) {
-                               throw new Exception(sourceExecutionThrowable);
-                       }
+               CompletableFuture<Void> getCompletionFuture() {
+                       return completionFuture;
                }
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index d25bd23..4a031af 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class StreamIterationHead<OUT> extends 
OneInputStreamTask<OUT, OUT> {
        // 
------------------------------------------------------------------------
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
                StreamRecord<OUT> nextRecord = shouldWait ?
                        dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
                        dataChannel.take();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8ba2a44..a626bba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -58,11 +58,11 @@ import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
-import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +74,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -129,13 +128,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        public static final ThreadGroup TRIGGER_THREAD_GROUP = new 
ThreadGroup("Triggers");
 
        /** The logger used by the StreamTask and its subclasses. */
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
-
-       /** Special value, letter that terminates the mailbox loop. */
-       private static final Runnable POISON_LETTER = () -> {};
-
-       /** Special value, letter that "wakes up" a waiting mailbox loop. */
-       private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {};
+       protected static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
 
        // 
------------------------------------------------------------------------
 
@@ -200,7 +193,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        private final SynchronousSavepointLatch syncSavepointLatch;
 
-       protected final Mailbox mailbox;
+       protected final MailboxProcessor mailboxProcessor;
 
        // 
------------------------------------------------------------------------
 
@@ -247,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
                this.recordWriters = createRecordWriters(configuration, 
environment);
                this.syncSavepointLatch = new SynchronousSavepointLatch();
-               this.mailbox = new MailboxImpl();
+               this.mailboxProcessor = new 
MailboxProcessor(this::performDefaultAction);
        }
 
        // 
------------------------------------------------------------------------
@@ -272,34 +265,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         * @param context context object for collaborative interaction between 
the action and the stream task.
         * @throws Exception on any problems in the action.
         */
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
                if (!inputProcessor.processInput()) {
                        context.allActionsCompleted();
                }
        }
 
        /**
-        * Runs the stream-tasks main processing loop.
-        */
-       private void run() throws Exception {
-               final ActionContext actionContext = new ActionContext();
-               while (true) {
-                       if (mailbox.hasMail()) {
-                               Optional<Runnable> maybeLetter;
-                               while ((maybeLetter = 
mailbox.tryTakeMail()).isPresent()) {
-                                       Runnable letter = maybeLetter.get();
-                                       if (letter == POISON_LETTER) {
-                                               return;
-                                       }
-                                       letter.run();
-                               }
-                       }
-
-                       performDefaultAction(actionContext);
-               }
-       }
-
-       /**
         * Emits the {@link 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK}
         * so that all registered timers are fired.
         *
@@ -381,6 +353,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // -------- Invoke --------
                        LOG.debug("Invoking {}", getName());
 
+                       // open mailbox
+                       mailboxProcessor.open();
+
                        // we need to make sure that any triggers scheduled in 
open() cannot be
                        // executed before all operators are opened
                        synchronized (lock) {
@@ -400,7 +375,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        // let the task do its work
                        isRunning = true;
-                       run();
+                       mailboxProcessor.runMailboxLoop();
 
                        // if this left the run() method cleanly despite the 
fact that this was canceled,
                        // make sure the "clean shutdown" is not attempted
@@ -422,6 +397,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                // make sure no new timers can come
                                timerService.quiesce();
 
+                               // let mailbox execution reject all new letters 
from this point
+                               mailboxProcessor.prepareClose();
+
                                // only set the StreamTask to not running after 
all operators have been closed!
                                // See FLINK-7430
                                isRunning = false;
@@ -489,12 +467,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        operatorChain.releaseOutputs();
                                }
                        }
+
+                       mailboxProcessor.close();
                }
        }
 
        @Override
        public final void cancel() throws Exception {
-               mailbox.clearAndPut(POISON_LETTER);
                isRunning = false;
                canceled = true;
 
@@ -505,10 +484,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        cancelTask();
                }
                finally {
+                       mailboxProcessor.cancelMailboxExecution();
                        cancelables.close();
                }
        }
 
+       public MailboxExecutor getTaskMailboxExecutor() {
+               return mailboxProcessor.getMailboxExecutor();
+       }
+
        public final boolean isRunning() {
                return isRunning;
        }
@@ -1332,40 +1316,4 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
                return output;
        }
-
-       /**
-        * The action context is passed as parameter into the default action 
method and holds control methods for feedback
-        * of from the default action to the mailbox.
-        */
-       public final class ActionContext {
-
-               private final Runnable actionUnavailableLetter = 
ThrowingRunnable.unchecked(() -> mailbox.takeMail().run());
-
-               /**
-                * This method must be called to end the stream task when all 
actions for the tasks have been performed.
-                */
-               public void allActionsCompleted() {
-                       mailbox.clearAndPut(POISON_LETTER);
-               }
-
-               /**
-                * Calling this method signals that the mailbox-thread should 
continue invoking the default action, e.g. because
-                * new input became available for processing.
-                *
-                * @throws InterruptedException on interruption.
-                */
-               public void actionsAvailable() throws InterruptedException {
-                       mailbox.putMail(DEFAULT_ACTION_AVAILABLE);
-               }
-
-               /**
-                * Calling this method signals that the mailbox-thread should 
(temporarily) stop invoking the default action,
-                * e.g. because there is currently no input available.
-                *
-                * @throws InterruptedException on interruption.
-                */
-               public void actionsUnavailable() throws InterruptedException {
-                       mailbox.putMail(actionUnavailableLetter);
-               }
-       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
index dfa8d76..9b2d95f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
@@ -20,17 +20,85 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
+
 /**
  * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * multiple producer threads and a single consumer. This has a lifecycle of 
closed -> open -> (quiesced) -> closed.
  */
 public interface Mailbox extends MailboxReceiver, MailboxSender {
 
        /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
+        * This enum represents the states of the mailbox lifecycle.
+        */
+       enum State {
+               OPEN, QUIESCED, CLOSED
+       }
+
+       /**
+        * Open the mailbox. In this state, the mailbox supports put and take 
operations.
+        */
+       void open();
+
+       /**
+        * Quiesce the mailbox. In this state, the mailbox supports only take 
operations and all pending and future put
+        * operations will throw {@link MailboxStateException}.
+        */
+       void quiesce();
+
+       /**
+        * Close the mailbox. In this state, all pending and future put 
operations and all pending and future take
+        * operations will throw {@link MailboxStateException}. Returns all 
letters that were still enqueued.
+        *
+        * @return list with all letters that where enqueued in the mailbox at 
the time of closing.
+        */
+       @Nonnull
+       List<Runnable> close();
+
+       /**
+        * The effect of this is that all pending letters in the mailbox are 
dropped and the given priorityLetter
+        * is enqueued to the head of the mailbox. Dropped letters are 
returned. This method should only be invoked
+        * by code that has ownership of the mailbox object and only rarely 
used, e.g. to submit special events like
+        * shutting down the mailbox loop.
+        *
+        * @param priorityLetter action to enqueue atomically after the mailbox 
was cleared.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
+        */
+       @Nonnull
+       List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
+
+       /**
+        * Adds the given action to the head of the mailbox. This method will 
block if the mailbox is full and
+        * should therefore only be called from outside the mailbox main-thread 
to avoid deadlocks.
+        *
+        * @param priorityLetter action to enqueue to the head of the mailbox.
+        * @throws InterruptedException on interruption.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
+        */
+       void putFirst(@Nonnull Runnable priorityLetter) throws 
InterruptedException, MailboxStateException;
+
+       /**
+        * Adds the given action to the head of the mailbox if the mailbox is 
not full. Returns true if the letter
+        * was successfully added to the mailbox.
+        *
+        * @param priorityLetter action to enqueue to the head of the mailbox.
+        * @return true if the letter was successfully added.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
+        */
+       boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
+
+       /**
+        * Returns the current state of the mailbox as defined by the lifecycle 
enum {@link State}.
+        *
+        * @return the current state of the mailbox.
+        */
+       @Nonnull
+       State getState();
+
+       /**
+        * Returns the total capacity of the mailbox.
         *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+        * @return the total capacity of the mailbox.
         */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+       int capacity();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
index e9bd346..0f6bbf0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
@@ -24,6 +24,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -77,6 +79,12 @@ public class MailboxImpl implements Mailbox {
        private volatile int count;
 
        /**
+        * The state of the mailbox in the lifecycle of open, quiesced, and 
closed.
+        */
+       @GuardedBy("lock")
+       private volatile State state;
+
+       /**
         * A mask to wrap around the indexes of the ring buffer. We use this to 
avoid ifs or modulo ops.
         */
        private final int moduloMask;
@@ -93,6 +101,7 @@ public class MailboxImpl implements Mailbox {
                this.lock = new ReentrantLock();
                this.notEmpty = lock.newCondition();
                this.notFull = lock.newCondition();
+               this.state = State.CLOSED;
        }
 
        @Override
@@ -101,11 +110,16 @@ public class MailboxImpl implements Mailbox {
        }
 
        @Override
-       public Optional<Runnable> tryTakeMail() {
+       public Optional<Runnable> tryTakeMail() throws MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
-                       return isEmpty() ? Optional.empty() : 
Optional.of(takeInternal());
+                       if (isEmpty()) {
+                               checkTakeStateConditions();
+                               return Optional.empty();
+                       } else {
+                               return Optional.of(takeInternal());
+                       }
                } finally {
                        lock.unlock();
                }
@@ -113,11 +127,12 @@ public class MailboxImpl implements Mailbox {
 
        @Nonnull
        @Override
-       public Runnable takeMail() throws InterruptedException {
+       public Runnable takeMail() throws InterruptedException, 
MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                        while (isEmpty()) {
+                               checkTakeStateConditions();
                                notEmpty.await();
                        }
                        return takeInternal();
@@ -129,14 +144,15 @@ public class MailboxImpl implements Mailbox {
        
//------------------------------------------------------------------------------------------------------------------
 
        @Override
-       public boolean tryPutMail(@Nonnull Runnable letter) {
+       public boolean tryPutMail(@Nonnull Runnable letter) throws 
MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                        if (isFull()) {
+                               checkPutStateConditions();
                                return false;
                        } else {
-                               putInternal(letter);
+                               putTailInternal(letter);
                                return true;
                        }
                } finally {
@@ -145,14 +161,15 @@ public class MailboxImpl implements Mailbox {
        }
 
        @Override
-       public void putMail(@Nonnull Runnable letter) throws 
InterruptedException {
+       public void putMail(@Nonnull Runnable letter) throws 
InterruptedException, MailboxStateException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                        while (isFull()) {
+                               checkPutStateConditions();
                                notFull.await();
                        }
-                       putInternal(letter);
+                       putTailInternal(letter);
                } finally {
                        lock.unlock();
                }
@@ -160,16 +177,79 @@ public class MailboxImpl implements Mailbox {
 
        
//------------------------------------------------------------------------------------------------------------------
 
-       private void putInternal(Runnable letter) {
+       @Nonnull
+       @Override
+       public List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) 
throws MailboxStateException {
+               ArrayList<Runnable> droppedLetters = new 
ArrayList<>(capacity());
+
+               lock.lock();
+               try {
+                       // check state first to avoid loosing any letters 
forever through exception
+                       checkPutStateConditions();
+                       dropAllLetters(droppedLetters);
+                       putTailInternal(priorityLetter);
+               } finally {
+                       lock.unlock();
+               }
+
+               return droppedLetters;
+       }
+
+       @Override
+       public void putFirst(@Nonnull Runnable priorityLetter) throws 
InterruptedException, MailboxStateException {
+               final ReentrantLock lock = this.lock;
+               lock.lockInterruptibly();
+               try {
+                       while (isFull()) {
+                               checkPutStateConditions();
+                               notFull.await();
+                       }
+                       putHeadInternal(priorityLetter);
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       public boolean tryPutFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException {
+               final ReentrantLock lock = this.lock;
+               lock.lock();
+               try {
+                       if (isFull()) {
+                               checkPutStateConditions();
+                               return false;
+                       } else {
+                               putHeadInternal(priorityLetter);
+                               return true;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       
//------------------------------------------------------------------------------------------------------------------
+
+       private void putHeadInternal(Runnable letter) throws 
MailboxStateException {
                assert lock.isHeldByCurrentThread();
+               checkPutStateConditions();
+               headIndex = decreaseIndexWithWrapAround(headIndex);
+               this.ringBuffer[headIndex] = letter;
+               ++count;
+               notEmpty.signal();
+       }
+
+       private void putTailInternal(Runnable letter) throws 
MailboxStateException {
+               assert lock.isHeldByCurrentThread();
+               checkPutStateConditions();
                this.ringBuffer[tailIndex] = letter;
                tailIndex = increaseIndexWithWrapAround(tailIndex);
                ++count;
                notEmpty.signal();
        }
 
-       private Runnable takeInternal() {
+       private Runnable takeInternal() throws MailboxStateException {
                assert lock.isHeldByCurrentThread();
+               checkTakeStateConditions();
                final Runnable[] buffer = this.ringBuffer;
                Runnable letter = buffer[headIndex];
                buffer[headIndex] = null;
@@ -179,32 +259,111 @@ public class MailboxImpl implements Mailbox {
                return letter;
        }
 
+       private void dropAllLetters(List<Runnable> dropInto) {
+               assert lock.isHeldByCurrentThread();
+               int localCount = count;
+               while (localCount > 0) {
+                       dropInto.add(ringBuffer[headIndex]);
+                       ringBuffer[headIndex] = null;
+                       headIndex = increaseIndexWithWrapAround(headIndex);
+                       --localCount;
+                       notFull.signal();
+               }
+               count = 0;
+       }
+
        private int increaseIndexWithWrapAround(int old) {
                return (old + 1) & moduloMask;
        }
 
+       private int decreaseIndexWithWrapAround(int old) {
+               return (old - 1) & moduloMask;
+       }
+
        private boolean isFull() {
-               return count >= ringBuffer.length;
+               return count >= capacity();
        }
 
        private boolean isEmpty() {
                return count == 0;
        }
 
+       private boolean isPutAbleState() {
+               return state == State.OPEN;
+       }
+
+       private boolean isTakeAbleState() {
+               return state != State.CLOSED;
+       }
+
+       private void checkPutStateConditions() throws MailboxStateException {
+               final State state = this.state;
+               if (!isPutAbleState()) {
+                       throw new MailboxStateException("Mailbox is in state " 
+ state + ", but is required to be in state " +
+                               State.OPEN + " for put operations.");
+               }
+       }
+
+       private void checkTakeStateConditions() throws MailboxStateException {
+               final State state = this.state;
+               if (!isTakeAbleState()) {
+                       throw new MailboxStateException("Mailbox is in state " 
+ state + ", but is required to be in state " +
+                               State.OPEN + " or " + State.QUIESCED + " for 
take operations.");
+               }
+       }
+
+       @Override
+       public void open() {
+               lock.lock();
+               try {
+                       if (state == State.CLOSED) {
+                               state = State.OPEN;
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
        @Override
-       public void clearAndPut(@Nonnull Runnable shutdownAction) {
+       public void quiesce() {
                lock.lock();
                try {
-                       int localCount = count;
-                       while (localCount > 0) {
-                               ringBuffer[headIndex] = null;
-                               headIndex = 
increaseIndexWithWrapAround(headIndex);
-                               --localCount;
+                       if (state == State.OPEN) {
+                               state = State.QUIESCED;
                        }
-                       count = 0;
-                       putInternal(shutdownAction);
+                       notFull.signalAll();
                } finally {
                        lock.unlock();
                }
        }
+
+       @Nonnull
+       @Override
+       public List<Runnable> close() {
+               final ArrayList<Runnable> droppedLetters = new 
ArrayList<>(capacity());
+
+               lock.lock();
+               try {
+                       dropAllLetters(droppedLetters);
+                       state = State.CLOSED;
+                       // to unblock all
+                       notFull.signalAll();
+                       notEmpty.signalAll();
+               } finally {
+                       lock.unlock();
+               }
+
+               return droppedLetters;
+       }
+
+       @Nonnull
+       @Override
+       public State getState() {
+               return state;
+       }
+
+       @Override
+       public int capacity() {
+               return ringBuffer.length;
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
index 2d2f112..fc7c754 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
@@ -39,15 +39,17 @@ public interface MailboxReceiver {
         *
         * @return an optional with either the oldest letter from the mailbox 
(head of queue) if the mailbox is not empty or
         * an empty optional otherwise.
+        * @throws  MailboxStateException if mailbox is already closed.
         */
-       Optional<Runnable> tryTakeMail();
+       Optional<Runnable> tryTakeMail() throws MailboxStateException;
 
        /**
         * This method returns the oldest letter from the mailbox (head of 
queue) or blocks until a letter is available.
         *
         * @return the oldest letter from the mailbox (head of queue).
         * @throws InterruptedException on interruption.
+        * @throws  MailboxStateException if mailbox is already closed.
         */
        @Nonnull
-       Runnable takeMail() throws InterruptedException;
+       Runnable takeMail() throws InterruptedException, MailboxStateException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
index 36d10a1..2a2274a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
@@ -32,14 +32,16 @@ public interface MailboxSender {
         *
         * @param letter the letter to enqueue.
         * @return <code>true</code> iff successful.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
         */
-       boolean tryPutMail(@Nonnull Runnable letter);
+       boolean tryPutMail(@Nonnull Runnable letter) throws 
MailboxStateException;
 
        /**
         * Enqueues the given letter to the mailbox and blocks until there is 
capacity for a successful put.
         *
         * @param letter the letter to enqueue.
         * @throws InterruptedException on interruption.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
         */
-       void putMail(@Nonnull Runnable letter) throws InterruptedException;
+       void putMail(@Nonnull Runnable letter) throws InterruptedException,  
MailboxStateException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java
similarity index 60%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java
index dfa8d76..a3168c7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxStateException.java
@@ -18,19 +18,24 @@
 
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
-import javax.annotation.Nonnull;
-
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * This exception signals that a method of the mailbox was invoked in a state 
that does not support the invocation,
+ * e.g. on the attempt to put a letter into a closed mailbox.
  */
-public interface Mailbox extends MailboxReceiver, MailboxSender {
+public class MailboxStateException extends Exception {
+
+       MailboxStateException() {
+       }
+
+       MailboxStateException(String message) {
+               super(message);
+       }
+
+       MailboxStateException(String message, Throwable cause) {
+               super(message, cause);
+       }
 
-       /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
-        *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
-        */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+       MailboxStateException(Throwable cause) {
+               super(cause);
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java
similarity index 50%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java
index dfa8d76..ba933e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/DefaultActionContext.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.tasks.mailbox;
-
-import javax.annotation.Nonnull;
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * This context is a feedback interface for the default action to interact 
with the mailbox execution. In particular
+ * it offers ways to signal that the execution of the default action should be 
finished or temporarily suspended.
  */
-public interface Mailbox extends MailboxReceiver, MailboxSender {
+public interface DefaultActionContext {
+
+       /**
+        * This method must be called to end the stream task when all actions 
for the tasks have been performed. This
+        * method can be invoked from any thread.
+        */
+       void allActionsCompleted();
 
        /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
-        *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+        * Calling this method signals that the mailbox-thread should 
(temporarily) stop invoking the default action,
+        * e.g. because there is currently no input available. This method must 
be invoked from the mailbox-thread only!
         */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+       SuspendedMailboxDefaultAction suspendDefaultAction();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java
similarity index 56%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java
index dfa8d76..21fc191 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxDefaultAction.java
@@ -16,21 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.tasks.mailbox;
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 
-import javax.annotation.Nonnull;
+import org.apache.flink.annotation.Internal;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * Interface for the default action that is repeatedly invoked in the 
mailbox-loop.
  */
-public interface Mailbox extends MailboxReceiver, MailboxSender {
+@Internal
+public interface MailboxDefaultAction {
 
        /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
+        * This method implements the default action of the mailbox loop (e.g. 
processing one event from the input).
+        * Implementations should (in general) be non-blocking.
         *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+        * @param context context object for collaborative interaction between 
the default action and the mailbox loop.
+        * @throws Exception on any problems in the action.
         */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+       void runDefaultAction(DefaultActionContext context) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
new file mode 100644
index 0000000..1848e77
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Interface for an {@link Executor} build around a {@link Mailbox}-based 
execution model.
+ */
+public interface MailboxExecutor extends Executor {
+
+       /**
+        * Executes the given command at some time in the future in the mailbox 
thread. This call can block when the
+        * mailbox is currently full. Therefore, this method must not be called 
from the mailbox thread itself as this
+        * can cause a deadlock. Instead, if the caller is already in the 
mailbox thread, the command should just be
+        * executed directly or use the non-blocking {@link 
#tryExecute(Runnable)}.
+        *
+        * @param command the runnable task to add to the mailbox for execution.
+        * @throws RejectedExecutionException if this task cannot be accepted 
for execution, e.g. because the mailbox is
+        *                                    quiesced or closed.
+        */
+       @Override
+       void execute(@Nonnull Runnable command) throws 
RejectedExecutionException;
+
+       /**
+        * Attempts to enqueue the given command in the mailbox for execution. 
On success, the method returns true. If
+        * the mailbox is full, this method returns immediately without adding 
the command and returns false.
+        *
+        * @param command the runnable task to add to the mailbox for execution.
+        * @return true if the command was added to the mailbox. False if the 
command could not be added because the mailbox
+        * was full.
+        * @throws RejectedExecutionException if this task cannot be accepted 
for execution, e.g. because the mailbox is
+        *                                    quiesced or closed.
+        */
+       boolean tryExecute(Runnable command) throws RejectedExecutionException;
+
+       /**
+        * This methods starts running the command at the head of the mailbox 
and is intended to be used by the mailbox
+        * thread to yield from a currently ongoing action to another command. 
The method blocks until another command to
+        * run is available in the mailbox and must only be called from the 
mailbox thread. Must only be called from the
+        * mailbox thread to not violate the single-threaded execution model.
+        *
+        * @throws InterruptedException on interruption.
+        * @throws IllegalStateException if the mailbox is closed and can no 
longer supply runnables for yielding.
+        */
+       void yield() throws InterruptedException, IllegalStateException;
+
+       /**
+        * This methods attempts to run the command at the head of the mailbox. 
This is intended to be used by the mailbox
+        * thread to yield from a currently ongoing action to another command. 
The method returns true if a command was
+        * found and executed or false if the mailbox was empty. Must only be 
called from the
+        * mailbox thread to not violate the single-threaded execution model.
+        *
+        * @return true on successful yielding to another command, false if 
there was no command to yield to.
+        * @throws IllegalStateException if the mailbox is closed and can no 
longer supply runnables for yielding.
+        */
+       boolean tryYield() throws IllegalStateException;
+
+       /**
+        * Check if the current thread is the mailbox thread.
+        *
+        * @return only true if called from the mailbox thread.
+        */
+       boolean isMailboxThread();
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java
similarity index 56%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java
index dfa8d76..1c267c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorService.java
@@ -16,21 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.tasks.mailbox;
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 
-import javax.annotation.Nonnull;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+
+import java.util.concurrent.ExecutorService;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * Interface for an {@link ExecutorService} build around a {@link 
Mailbox}-based execution model.
  */
-public interface Mailbox extends MailboxReceiver, MailboxSender {
-
-       /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
-        *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
-        */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+public interface MailboxExecutorService extends MailboxExecutor, 
ExecutorService {
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
new file mode 100644
index 0000000..944b7a9
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxStateException;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of an executor service build around a mailbox-based 
execution model.
+ */
+public class MailboxExecutorServiceImpl extends AbstractExecutorService 
implements MailboxExecutorService {
+
+       /** Reference to the thread that executes the mailbox letters.  */
+       @Nonnull
+       private final Thread taskMailboxThread;
+
+       /** The mailbox that manages the submitted runnable objects. */
+       @Nonnull
+       private final Mailbox mailbox;
+
+       public MailboxExecutorServiceImpl(@Nonnull Mailbox mailbox) {
+               this(mailbox, Thread.currentThread());
+       }
+
+       public MailboxExecutorServiceImpl(@Nonnull Mailbox mailbox, @Nonnull 
Thread taskMailboxThread) {
+               this.mailbox = mailbox;
+               this.taskMailboxThread = taskMailboxThread;
+       }
+
+       @Override
+       public void execute(@Nonnull Runnable command) {
+               checkIsNotMailboxThread();
+               try {
+                       mailbox.putMail(command);
+               } catch (InterruptedException irex) {
+                       Thread.currentThread().interrupt();
+                       throw new RejectedExecutionException("Sender thread was 
interrupted while blocking on mailbox.", irex);
+               } catch (MailboxStateException mbex) {
+                       throw new RejectedExecutionException(mbex);
+               }
+       }
+
+       @Override
+       public boolean tryExecute(Runnable command) {
+               try {
+                       return mailbox.tryPutMail(command);
+               } catch (MailboxStateException e) {
+                       throw new RejectedExecutionException(e);
+               }
+       }
+
+       @Override
+       public void yield() throws InterruptedException, IllegalStateException {
+               checkIsMailboxThread();
+               try {
+                       Runnable runnable = mailbox.takeMail();
+                       runnable.run();
+               } catch (MailboxStateException e) {
+                       throw new IllegalStateException("Mailbox can no longer 
supply runnables for yielding.", e);
+               }
+       }
+
+       @Override
+       public boolean tryYield() throws IllegalStateException {
+               checkIsMailboxThread();
+               try {
+                       Optional<Runnable> runnableOptional = 
mailbox.tryTakeMail();
+                       if (runnableOptional.isPresent()) {
+                               runnableOptional.get().run();
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } catch (MailboxStateException e) {
+                       throw new IllegalStateException("Mailbox can no longer 
supply runnables for yielding.", e);
+               }
+       }
+
+       @Override
+       public boolean isMailboxThread() {
+               return Thread.currentThread() == taskMailboxThread;
+       }
+
+       @Override
+       public void shutdown() {
+               mailbox.quiesce();
+       }
+
+       @Nonnull
+       @Override
+       public List<Runnable> shutdownNow() {
+               return mailbox.close();
+       }
+
+       @Override
+       public boolean isShutdown() {
+               return mailbox.getState() != Mailbox.State.OPEN;
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return mailbox.getState() == Mailbox.State.CLOSED;
+       }
+
+       @Override
+       public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) {
+               throw new UnsupportedOperationException("This method is not 
supported by this implementation.");
+       }
+
+       /**
+        * Returns the mailbox that manages the execution order.
+        *
+        * @return the mailbox.
+        */
+       @Nonnull
+       public Mailbox getMailbox() {
+               return mailbox;
+       }
+
+       private void checkIsMailboxThread() {
+               if (!isMailboxThread()) {
+                       throw new IllegalStateException(
+                               "Illegal thread detected. This method must be 
called from inside the mailbox thread!");
+               }
+       }
+
+       private void checkIsNotMailboxThread() {
+               if (isMailboxThread()) {
+                       throw new IllegalStateException(
+                               "Illegal thread detected. This method must NOT 
be called from inside the mailbox thread!");
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
new file mode 100644
index 0000000..77906f7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxStateException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * <p>The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * <p>The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * <p>This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ */
+public class MailboxProcessor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+       /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+       private final Mailbox mailbox;
+
+       /** Executor-style facade for client code to submit actions to the 
mailbox. */
+       private final MailboxExecutorService mailboxExecutor;
+
+       /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+       private final MailboxDefaultAction mailboxDefaultAction;
+
+       /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+       private boolean mailboxLoopRunning;
+
+       /**
+        * Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+        * default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+        * Must only be accessed from mailbox thread.
+        */
+       private SuspendedMailboxDefaultAction suspendedDefaultAction;
+
+       /** Special action that is used to terminate the mailbox loop. */
+       private final Runnable mailboxPoisonLetter;
+
+       public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+               this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+               this.mailbox = new MailboxImpl();
+               this.mailboxExecutor = new MailboxExecutorServiceImpl(mailbox);
+               this.mailboxPoisonLetter = () -> mailboxLoopRunning = false;
+               this.mailboxLoopRunning = true;
+               this.suspendedDefaultAction = null;
+       }
+
+       /**
+        * Returns an executor service facade to submit actions to the mailbox.
+        */
+       public MailboxExecutorService getMailboxExecutor() {
+               return mailboxExecutor;
+       }
+
+       /**
+        * Lifecycle method to open the mailbox for action submission.
+        */
+       public void open() {
+               mailbox.open();
+       }
+
+       /**
+        * Lifecycle method to close the mailbox for action submission.
+        */
+       public void prepareClose() {
+               mailboxExecutor.shutdown();
+       }
+
+       /**
+        * Lifecycle method to close the mailbox for action 
submission/retrieval. This will cancel all instances of
+        * {@link java.util.concurrent.RunnableFuture} that are still contained 
in the mailbox.
+        */
+       public void close() {
+               
FutureUtils.cancelRunnableFutures(mailboxExecutor.shutdownNow());
+       }
+
+       /**
+        * Runs the mailbox processing loop. This is where the main work is 
done.
+        */
+       public void runMailboxLoop() throws Exception {
+
+               Preconditions.checkState(
+                       mailboxExecutor.isMailboxThread(),
+                       "Method must be executed by declared mailbox thread!");
+
+               final Mailbox localMailbox = mailbox;
+
+               assert localMailbox.getState() == Mailbox.State.OPEN : "Mailbox 
must be opened!";
+
+               final MailboxDefaultActionContext defaultActionContext = new 
MailboxDefaultActionContext(this);
+
+               while (processMail(localMailbox)) {
+                       
mailboxDefaultAction.runDefaultAction(defaultActionContext);
+               }
+       }
+
+       /**
+        * Cancels the mailbox loop execution. All pending mailbox actions will 
not be executed anymore, if they are
+        * instance of {@link java.util.concurrent.RunnableFuture}, they will 
be cancelled.
+        */
+       public void cancelMailboxExecution() {
+               clearMailboxAndRunPriorityAction(mailboxPoisonLetter);
+       }
+
+       /**
+        * Reports a throwable for rethrowing from the mailbox thread. This 
will clear and cancel all other pending letters.
+        * @param throwable to report by rethrowing from the mailbox loop.
+        */
+       public void reportThrowable(Throwable throwable) {
+               clearMailboxAndRunPriorityAction(() -> {
+                       throw new WrappingRuntimeException(throwable);
+               });
+       }
+
+       /**
+        * This method must be called to end the stream task when all actions 
for the tasks have been performed.
+        */
+       public void allActionsCompleted() {
+               try {
+                       if (mailboxExecutor.isMailboxThread()) {
+                               mailboxLoopRunning = false;
+                               ensureControlFlowSignalCheck();
+                       } else {
+                               mailbox.putFirst(mailboxPoisonLetter);
+                       }
+               } catch (InterruptedException e) {
+                       Thread.currentThread().interrupt();
+               } catch (MailboxStateException me) {
+                       LOG.debug("Action context could not submit poison 
letter to mailbox.", me);
+               }
+       }
+
+       /**
+        * This helper method handles all special actions from the mailbox. It 
returns true if the mailbox loop should
+        * continue running, false if it should stop. In the current design, 
this method also evaluates all control flag
+        * changes. This keeps the hot path in {@link #runMailboxLoop()} free 
from any other flag checking, at the cost
+        * that all flag changes must make sure that the mailbox signals 
mailbox#hasMail.
+        */
+       private boolean processMail(Mailbox mailbox) throws 
MailboxStateException, InterruptedException {
+
+               // Doing this check is an optimization to only have a volatile 
read in the expected hot path, locks are only
+               // acquired after this point.
+               if (!mailbox.hasMail()) {
+                       // We can also directly return true because all changes 
to #isMailboxLoopRunning must be connected to
+                       // mailbox.hasMail() == true.
+                       return true;
+               }
+
+               // TODO consider batched draining into list and/or limit number 
of executed letters
+               // Take letters in a non-blockingly and execute them.
+               Optional<Runnable> maybeLetter;
+               while (isMailboxLoopRunning() && (maybeLetter = 
mailbox.tryTakeMail()).isPresent()) {
+                       maybeLetter.get().run();
+               }
+
+               // If the default action is currently not available, we can run 
a blocking mailbox execution until the default
+               // action becomes available again.
+               while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {
+                       Runnable letter = mailbox.takeMail();
+                       letter.run();
+               }
+
+               return isMailboxLoopRunning();
+       }
+
+       /**
+        * Calling this method signals that the mailbox-thread should 
(temporarily) stop invoking the default action,
+        * e.g. because there is currently no input available.
+        */
+       private SuspendedMailboxDefaultAction suspendDefaultAction() {
+
+               Preconditions.checkState(mailboxExecutor.isMailboxThread(), 
"Suspending must only be called from the mailbox thread!");
+
+               if (suspendedDefaultAction == null) {
+                       suspendedDefaultAction = new 
SuspendDefaultActionRunnable();
+                       ensureControlFlowSignalCheck();
+               }
+
+               return suspendedDefaultAction;
+       }
+
+       private boolean isDefaultActionUnavailable() {
+               return suspendedDefaultAction != null;
+       }
+
+       private boolean isMailboxLoopRunning() {
+               return mailboxLoopRunning;
+       }
+
+       /**
+        * Helper method to make sure that the mailbox loop will check the 
control flow flags in the next iteration.
+        */
+       private void ensureControlFlowSignalCheck() {
+               // Make sure that mailbox#hasMail is true via a dummy letter so 
that the flag change is noticed.
+               if (!mailbox.hasMail()) {
+                       try {
+                               mailbox.tryPutMail(() -> {});
+                       } catch (MailboxStateException me) {
+                               LOG.debug("Mailbox closed when trying to submit 
letter for control flow signal.", me);
+                       }
+               }
+       }
+
+       private void clearMailboxAndRunPriorityAction(Runnable priorityLetter) {
+               try {
+                       List<Runnable> droppedRunnables = 
mailbox.clearAndPut(priorityLetter);
+                       FutureUtils.cancelRunnableFutures(droppedRunnables);
+               } catch (MailboxStateException msex) {
+                       LOG.debug("Mailbox already closed in cancel().", msex);
+               }
+       }
+
+       /**
+        * Implementation of {@link DefaultActionContext} that is connected to 
a {@link MailboxProcessor}
+        * instance.
+        */
+       private static final class MailboxDefaultActionContext implements 
DefaultActionContext {
+
+               private final MailboxProcessor mailboxProcessor;
+
+               private MailboxDefaultActionContext(MailboxProcessor 
mailboxProcessor) {
+                       this.mailboxProcessor = mailboxProcessor;
+               }
+
+               @Override
+               public void allActionsCompleted() {
+                       mailboxProcessor.allActionsCompleted();
+               }
+
+               @Override
+               public SuspendedMailboxDefaultAction suspendDefaultAction() {
+                       return mailboxProcessor.suspendDefaultAction();
+               }
+       }
+
+       /**
+        * Represents the suspended state of the default action and offers an 
idempotent method to resume execution.
+        */
+       private final class SuspendDefaultActionRunnable implements 
SuspendedMailboxDefaultAction {
+
+               @Override
+               public void resume() {
+                       Preconditions.checkState(
+                               mailboxExecutor.isMailboxThread(),
+                               "SuspendedMailboxDefaultAction::resume resume 
must only be called from the mailbox-thread!");
+
+                       if (suspendedDefaultAction == this) {
+                               suspendedDefaultAction = null;
+                       }
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
similarity index 57%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
index dfa8d76..ca31254 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/SuspendedMailboxDefaultAction.java
@@ -16,21 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.tasks.mailbox;
-
-import javax.annotation.Nonnull;
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
 
 /**
- * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * Represents the suspended state of a {@link MailboxDefaultAction}, ready to 
resume.
  */
-public interface Mailbox extends MailboxReceiver, MailboxSender {
+public interface SuspendedMailboxDefaultAction {
 
        /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
-        *
-        * @param priorityAction action to enqueue atomically after the mailbox 
was cleared.
+        * Resume execution of the default action. Must only be called from the 
mailbox thread!.
         */
-       void clearAndPut(@Nonnull Runnable priorityAction);
+       void resume();
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
index 1308796..c7c1440 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -191,7 +192,7 @@ public class StreamTaskSelectiveReadingTest {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        if (!started) {
                                synchronized (this) {
                                        this.wait();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 72e8a19..4f5ba03 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -79,6 +79,7 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -225,7 +226,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        RUN_LATCH.trigger();
                        // wait until we have started an asynchronous checkpoint
                        CHECKPOINTING_LATCH.await();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 601d72b..949a75f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -110,6 +110,7 @@ import 
org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -828,7 +829,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() throws Exception {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        context.allActionsCompleted();
                }
 
@@ -1035,7 +1036,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) {
+               protected void performDefaultAction(DefaultActionContext 
context) {
                        if (isCanceled() || inputFinished) {
                                context.allActionsCompleted();
                        }
@@ -1072,7 +1073,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        if (fail) {
                                throw new RuntimeException();
                        }
@@ -1160,7 +1161,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        holder = new LockHolder(getCheckpointLock(), latch);
                        holder.start();
                        latch.await();
@@ -1205,7 +1206,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        final OneShotLatch latch = new OneShotLatch();
                        final Object lock = new Object();
 
@@ -1271,7 +1272,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        syncLatch.await();
                        super.performDefaultAction(context);
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index bac9d43..af09716 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Rule;
@@ -151,7 +152,7 @@ public class SynchronousCheckpointITCase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        if (!isRunning) {
                                isRunning = true;
                                eventQueue.put(Event.TASK_IS_RUNNING);
@@ -159,7 +160,7 @@ public class SynchronousCheckpointITCase {
                        if (isCanceled()) {
                                context.allActionsCompleted();
                        } else {
-                               context.actionsUnavailable();
+                               context.suspendDefaultAction();
                        }
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 8b71423..0b36ad7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -171,7 +172,7 @@ public class SynchronousCheckpointTest {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        runningLatch.trigger();
                        execLatch.await();
                        super.performDefaultAction(context);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index e40e23d..7360508 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -82,6 +82,7 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamFilter;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -474,7 +475,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                public void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        triggerCheckpointOnBarrier(
                                new CheckpointMetaData(
                                        11L,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
index 9c4edbf..1a48136 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java
@@ -18,17 +18,23 @@
 
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
 
 /**
  * Unit tests for {@link MailboxImpl}.
@@ -36,7 +42,7 @@ import java.util.Queue;
 public class MailboxImplTest {
 
        private static final Runnable POISON_LETTER = () -> {};
-       private static final int CAPACITY_POW_2 = 1;
+       private static final int CAPACITY_POW_2 = 2;
        private static final int CAPACITY = 1 << CAPACITY_POW_2;
 
        /**
@@ -45,24 +51,77 @@ public class MailboxImplTest {
        private Mailbox mailbox;
 
        @Before
-       public void setUp() throws Exception {
+       public void setUp() {
                mailbox = new MailboxImpl(CAPACITY_POW_2);
+               mailbox.open();
+       }
+
+       @After
+       public void tearDown() {
+               mailbox.close();
        }
 
        /**
         * Test for #clearAndPut should remove other pending events and enqueue 
directly to the head of the mailbox queue.
         */
        @Test
-       public void testClearAndPut() {
+       public void testClearAndPut() throws Exception {
+
+               Runnable letterInstance = () -> {};
+
                for (int i = 0; i < CAPACITY; ++i) {
-                       Assert.assertTrue(mailbox.tryPutMail(() -> {}));
+                       Assert.assertTrue(mailbox.tryPutMail(letterInstance));
                }
 
-               mailbox.clearAndPut(POISON_LETTER);
+               List<Runnable> droppedLetters = 
mailbox.clearAndPut(POISON_LETTER);
 
                Assert.assertTrue(mailbox.hasMail());
                Assert.assertEquals(POISON_LETTER, mailbox.tryTakeMail().get());
                Assert.assertFalse(mailbox.hasMail());
+               Assert.assertEquals(CAPACITY, droppedLetters.size());
+       }
+
+       @Test
+       public void testPutAsHead() throws Exception {
+
+               Runnable instanceA = () -> {};
+               Runnable instanceB = () -> {};
+               Runnable instanceC = () -> {};
+               Runnable instanceD = () -> {};
+               Runnable instanceE = () -> {};
+
+               mailbox.putMail(instanceD);
+               mailbox.tryPutFirst(instanceC);
+               mailbox.putMail(instanceE);
+               mailbox.putFirst(instanceA);
+
+               OneShotLatch latch = new OneShotLatch();
+               Thread blockingPut = new Thread(() -> {
+                       // ensure we are full
+                       try {
+                               if (!mailbox.tryPutFirst(() -> { })) {
+                                       latch.trigger();
+
+                                       mailbox.putFirst(instanceB);
+
+                               }
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       } catch (MailboxStateException ignore) {
+                       }
+               });
+
+               blockingPut.start();
+               latch.await();
+
+               Assert.assertSame(instanceA, mailbox.takeMail());
+               blockingPut.join();
+               Assert.assertSame(instanceB, mailbox.takeMail());
+               Assert.assertSame(instanceC, mailbox.takeMail());
+               Assert.assertSame(instanceD, mailbox.takeMail());
+               Assert.assertSame(instanceE, mailbox.takeMail());
+
+               Assert.assertFalse(mailbox.tryTakeMail().isPresent());
        }
 
        @Test
@@ -111,6 +170,122 @@ public class MailboxImplTest {
        }
 
        /**
+        * Test that closing the mailbox unblocks pending accesses with correct 
exceptions.
+        */
+       @Test
+       public void testCloseUnblocks() throws InterruptedException {
+               testAllPuttingUnblocksInternal(Mailbox::close);
+               setUp();
+               testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close, 
MailboxStateException.class);
+       }
+
+       /**
+        * Test that silencing the mailbox unblocks pending accesses with 
correct exceptions.
+        */
+       @Test
+       public void testQuiesceUnblocks() throws Exception {
+               testAllPuttingUnblocksInternal(Mailbox::quiesce);
+       }
+
+       @Test
+       public void testLifeCycleQuiesce() throws Exception {
+               mailbox.putMail(() -> {});
+               mailbox.putMail(() -> {});
+               mailbox.quiesce();
+               testLifecyclePuttingInternal();
+               mailbox.takeMail();
+               Assert.assertTrue(mailbox.tryTakeMail().isPresent());
+               Assert.assertFalse(mailbox.tryTakeMail().isPresent());
+       }
+
+       @Test
+       public void testLifeCycleClose() throws Exception {
+               mailbox.close();
+               testLifecyclePuttingInternal();
+
+               try {
+                       mailbox.takeMail();
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+
+               try {
+                       mailbox.tryTakeMail();
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+       }
+
+       private void testLifecyclePuttingInternal() throws Exception {
+               try {
+                       mailbox.tryPutMail(() -> {});
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+               try {
+                       mailbox.tryPutFirst(() -> {});
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+               try {
+                       mailbox.putMail(() -> {});
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+               try {
+                       mailbox.putFirst(() -> {});
+                       Assert.fail();
+               } catch (MailboxStateException ignore) {
+               }
+       }
+
+       private void testAllPuttingUnblocksInternal(Consumer<Mailbox> 
unblockMethod) throws InterruptedException {
+               testUnblocksInternal(() -> mailbox.putMail(() -> {}), 
unblockMethod, MailboxStateException.class);
+               setUp();
+               testUnblocksInternal(() -> mailbox.putFirst(() -> {}), 
unblockMethod, MailboxStateException.class);
+               setUp();
+               testUnblocksInternal(() -> mailbox.clearAndPut(() -> {}), 
unblockMethod, MailboxStateException.class);
+       }
+
+       private void testUnblocksInternal(
+               RunnableWithException testMethod,
+               Consumer<Mailbox> unblockMethod,
+               Class<?> expectedExceptionClass) throws InterruptedException {
+               final Thread[] blockedThreads = new Thread[CAPACITY * 2];
+               final Exception[] exceptions = new 
Exception[blockedThreads.length];
+
+               CountDownLatch countDownLatch = new 
CountDownLatch(blockedThreads.length);
+
+               for (int i = 0; i < blockedThreads.length; ++i) {
+                       final int id = i;
+                       Thread blocked = new Thread(() -> {
+                               try {
+                                       countDownLatch.countDown();
+                                       while (true) {
+                                               testMethod.run();
+                                       }
+                               } catch (Exception ex) {
+                                       exceptions[id] = ex;
+                               }
+                       });
+                       blockedThreads[i] = blocked;
+                       blocked.start();
+               }
+
+               countDownLatch.await();
+               unblockMethod.accept(mailbox);
+
+               for (Thread blockedThread : blockedThreads) {
+                       blockedThread.join();
+               }
+
+               for (Exception exception : exceptions) {
+                       Assert.assertEquals(expectedExceptionClass, 
exception.getClass());
+               }
+
+       }
+
+       /**
         * Test producer-consumer pattern through the mailbox in a concurrent 
setting (n-writer / 1-reader).
         */
        private void testPutTake(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
new file mode 100644
index 0000000..92e106d
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorServiceImplTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for {@link MailboxExecutorServiceImpl}.
+ */
+public class MailboxExecutorServiceImplTest {
+
+       private MailboxExecutorServiceImpl mailboxExecutorService;
+       private ExecutorService otherThreadExecutor;
+       private MailboxImpl mailbox;
+
+       @Before
+       public void setUp() throws Exception {
+               this.mailbox = new MailboxImpl();
+               this.mailbox.open();
+               this.mailboxExecutorService = new 
MailboxExecutorServiceImpl(mailbox);
+               this.otherThreadExecutor = 
Executors.newSingleThreadScheduledExecutor();
+       }
+
+       @After
+       public void tearDown() {
+               otherThreadExecutor.shutdown();
+               try {
+                       if (!otherThreadExecutor.awaitTermination(60, 
TimeUnit.SECONDS)) {
+                               otherThreadExecutor.shutdownNow();
+                               if (!otherThreadExecutor.awaitTermination(60, 
TimeUnit.SECONDS)) {
+                                       throw new IllegalStateException("Thread 
pool did not terminate on time!");
+                               }
+                       }
+               } catch (InterruptedException ie) {
+                       otherThreadExecutor.shutdownNow();
+                       Thread.currentThread().interrupt();
+               }
+       }
+
+       @Test
+       public void testOpsAndLifecycle() throws Exception {
+               Assert.assertFalse(mailboxExecutorService.isShutdown());
+               Assert.assertFalse(mailboxExecutorService.isTerminated());
+               final TestRunnable testRunnable = new TestRunnable();
+               
Assert.assertTrue(mailboxExecutorService.tryExecute(testRunnable));
+               Assert.assertEquals(testRunnable, mailbox.tryTakeMail().get());
+               CompletableFuture.runAsync(() -> 
mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
+               Assert.assertEquals(testRunnable, mailbox.takeMail());
+               final TestRunnable yieldRun = new TestRunnable();
+               final TestRunnable leftoverRun = new TestRunnable();
+               Assert.assertTrue(mailboxExecutorService.tryExecute(yieldRun));
+               Future<?> leftoverFuture = CompletableFuture.supplyAsync(
+                       () -> mailboxExecutorService.submit(leftoverRun), 
otherThreadExecutor).get();
+               mailboxExecutorService.shutdown();
+               Assert.assertTrue(mailboxExecutorService.isShutdown());
+               Assert.assertFalse(mailboxExecutorService.isTerminated());
+
+               try {
+                       CompletableFuture.runAsync(() -> 
mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
+                       Assert.fail("execution should not work after 
shutdown().");
+               } catch (ExecutionException expected) {
+                       Assert.assertTrue(expected.getCause() instanceof 
RejectedExecutionException);
+               }
+
+               try {
+                       CompletableFuture.runAsync(() -> 
mailboxExecutorService.tryExecute(testRunnable), otherThreadExecutor).get();
+                       Assert.fail("execution should not work after 
shutdown().");
+               } catch (ExecutionException expected) {
+                       Assert.assertTrue(expected.getCause() instanceof 
RejectedExecutionException);
+               }
+
+               Assert.assertTrue(mailboxExecutorService.tryYield());
+               Assert.assertEquals(Thread.currentThread(), 
yieldRun.wasExecutedBy());
+               Assert.assertFalse(leftoverFuture.isDone());
+
+               List<Runnable> leftoverTasks = 
mailboxExecutorService.shutdownNow();
+               Assert.assertEquals(1, leftoverTasks.size());
+               Assert.assertFalse(leftoverFuture.isCancelled());
+               FutureUtils.cancelRunnableFutures(leftoverTasks);
+               Assert.assertTrue(leftoverFuture.isCancelled());
+
+               try {
+                       mailboxExecutorService.tryYield();
+                       Assert.fail("yielding should not work after 
shutdown().");
+               } catch (IllegalStateException expected) {
+               }
+
+               try {
+                       mailboxExecutorService.yield();
+                       Assert.fail("yielding should not work after 
shutdown().");
+               } catch (IllegalStateException expected) {
+               }
+       }
+
+       @Test
+       public void testTryYield() throws Exception {
+               final TestRunnable testRunnable = new TestRunnable();
+               CompletableFuture.runAsync(() -> 
mailboxExecutorService.execute(testRunnable), otherThreadExecutor).get();
+               Assert.assertTrue(mailboxExecutorService.tryYield());
+               Assert.assertFalse(mailbox.tryTakeMail().isPresent());
+               Assert.assertEquals(Thread.currentThread(), 
testRunnable.wasExecutedBy());
+       }
+
+       @Test
+       public void testYield() throws Exception {
+               final AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+               final TestRunnable testRunnable = new TestRunnable();
+               final Thread submitThread = new Thread(() -> {
+                       try {
+                               mailboxExecutorService.execute(testRunnable);
+                       } catch (Exception e) {
+                               exceptionReference.set(e);
+                       }
+               });
+
+               submitThread.start();
+               mailboxExecutorService.yield();
+               submitThread.join();
+
+               Assert.assertNull(exceptionReference.get());
+               Assert.assertEquals(Thread.currentThread(), 
testRunnable.wasExecutedBy());
+       }
+
+       /**
+        * Test {@link Runnable} that tracks execution.
+        */
+       static class TestRunnable implements Runnable {
+
+               private Thread executedByThread = null;
+
+               @Override
+               public void run() {
+                       Preconditions.checkState(!isExecuted(), "Runnable was 
already executed before by " + executedByThread);
+                       executedByThread = Thread.currentThread();
+               }
+
+               boolean isExecuted() {
+                       return executedByThread != null;
+               }
+
+               Thread wasExecutedBy() {
+                       return executedByThread;
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
new file mode 100644
index 0000000..80c71dd
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Unit tests for {@link MailboxProcessor}.
+ */
+public class MailboxProcessorTest {
+
+       @Test
+       public void testRejectIfNotOpen() {
+               MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) 
-> {});
+               try {
+                       mailboxProcessor.getMailboxExecutor().tryExecute(() -> 
{});
+                       Assert.fail("Should not be able to accept runnables if 
not opened.");
+               } catch (RejectedExecutionException expected) {
+               }
+       }
+
+       @Test
+       public void testShutdown() {
+               MailboxProcessor mailboxProcessor = new MailboxProcessor((ctx) 
-> {});
+               FutureTask<Void> testRunnableFuture = new FutureTask<>(() -> 
{}, null);
+               mailboxProcessor.open();
+               
mailboxProcessor.getMailboxExecutor().tryExecute(testRunnableFuture);
+               mailboxProcessor.prepareClose();
+
+               try {
+                       mailboxProcessor.getMailboxExecutor().tryExecute(() -> 
{});
+                       Assert.fail("Should not be able to accept runnables if 
not opened.");
+               } catch (RejectedExecutionException expected) {
+               }
+
+               Assert.assertFalse(testRunnableFuture.isDone());
+
+               mailboxProcessor.close();
+               Assert.assertTrue(testRunnableFuture.isCancelled());
+       }
+
+       @Test
+       public void testRunDefaultActionAndLetters() throws Exception {
+               AtomicBoolean stop = new AtomicBoolean(false);
+               MailboxThread mailboxThread = new MailboxThread() {
+                       @Override
+                       public void runDefaultAction(DefaultActionContext 
context) throws Exception {
+                               if (stop.get()) {
+                                       context.allActionsCompleted();
+                               } else {
+                                       Thread.sleep(10L);
+                               }
+                       }
+               };
+
+               MailboxProcessor mailboxProcessor = start(mailboxThread);
+               mailboxProcessor.getMailboxExecutor().execute(() -> 
stop.set(true));
+               stop(mailboxThread);
+       }
+
+       @Test
+       public void testRunDefaultAction() throws Exception {
+
+               final int expectedInvocations = 3;
+               final AtomicInteger counter = new AtomicInteger(0);
+               MailboxThread mailboxThread = new MailboxThread() {
+                       @Override
+                       public void runDefaultAction(DefaultActionContext 
context) {
+                               if (counter.incrementAndGet() == 
expectedInvocations) {
+                                       context.allActionsCompleted();
+                               }
+                       }
+               };
+
+               start(mailboxThread);
+               stop(mailboxThread);
+               Assert.assertEquals(expectedInvocations, counter.get());
+       }
+
+       @Test
+       public void testSignalUnAvailable() throws Exception {
+
+               final AtomicInteger counter = new AtomicInteger(0);
+               final AtomicReference<SuspendedMailboxDefaultAction> 
suspendedActionRef = new AtomicReference<>();
+               final OneShotLatch actionSuspendedLatch = new OneShotLatch();
+               final int blockAfterInvocations = 3;
+               final int totalInvocations = blockAfterInvocations * 2;
+
+               MailboxThread mailboxThread = new MailboxThread() {
+                       @Override
+                       public void runDefaultAction(DefaultActionContext 
context) {
+                               if (counter.incrementAndGet() == 
blockAfterInvocations) {
+                                       
suspendedActionRef.set(context.suspendDefaultAction());
+                                       actionSuspendedLatch.trigger();
+                               } else if (counter.get() == totalInvocations) {
+                                       context.allActionsCompleted();
+                               }
+                       }
+               };
+
+               start(mailboxThread);
+               actionSuspendedLatch.await();
+               Assert.assertEquals(blockAfterInvocations, counter.get());
+
+               suspendedActionRef.get().resume();
+               stop(mailboxThread);
+               Assert.assertEquals(totalInvocations, counter.get());
+       }
+
+       @Test
+       public void testSignalUnAvailablePingPong() throws Exception {
+               final AtomicReference<SuspendedMailboxDefaultAction> 
suspendedActionRef = new AtomicReference<>();
+               final int totalSwitches = 10000;
+               final MailboxThread mailboxThread = new MailboxThread() {
+                       int count = 0;
+
+                       @Override
+                       public void runDefaultAction(DefaultActionContext 
context) {
+
+                               // If this is violated, it means that the 
default action was invoked while we assumed suspension
+                               
Assert.assertTrue(suspendedActionRef.compareAndSet(null, 
context.suspendDefaultAction()));
+
+                               ++count;
+
+                               if (count == totalSwitches) {
+                                       context.allActionsCompleted();
+                               } else if (count % 1000 == 0) {
+                                       try {
+                                               Thread.sleep(1L);
+                                       } catch (InterruptedException e) {
+                                               
Thread.currentThread().interrupt();
+                                       }
+                               }
+                       }
+               };
+
+               mailboxThread.start();
+               final MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();
+               mailboxProcessor.open();
+
+               final Thread asyncUnblocker = new Thread(() -> {
+                       int count = 0;
+                       while (!Thread.currentThread().isInterrupted()) {
+
+                               final SuspendedMailboxDefaultAction resume =
+                                       suspendedActionRef.getAndSet(null);
+                               if (resume != null) {
+                                       resume.resume();
+                               } else {
+                                       try {
+                                               
mailboxProcessor.getMailboxExecutor().execute(() -> { });
+                                       } catch (RejectedExecutionException 
ignore) {
+                                       }
+                               }
+
+                               ++count;
+                               if (count % 5000 == 0) {
+                                       try {
+                                               Thread.sleep(1L);
+                                       } catch (InterruptedException e) {
+                                               
Thread.currentThread().interrupt();
+                                       }
+                               }
+                       }
+               });
+
+               asyncUnblocker.start();
+               mailboxThread.signalStart();
+               mailboxThread.join();
+               asyncUnblocker.interrupt();
+               asyncUnblocker.join();
+               mailboxProcessor.prepareClose();
+               mailboxProcessor.close();
+               mailboxThread.checkException();
+       }
+
+       private static MailboxProcessor start(MailboxThread mailboxThread) {
+               mailboxThread.start();
+               final MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();
+               mailboxProcessor.open();
+               mailboxThread.signalStart();
+               return mailboxProcessor;
+       }
+
+       private static void stop(MailboxThread mailboxThread) throws Exception {
+               mailboxThread.join();
+               MailboxProcessor mailboxProcessor = 
mailboxThread.getMailboxProcessor();
+               mailboxProcessor.prepareClose();
+               mailboxProcessor.close();
+               mailboxThread.checkException();
+       }
+
+       static class MailboxThread extends Thread implements 
MailboxDefaultAction {
+
+               MailboxProcessor mailboxProcessor;
+               OneShotLatch mailboxCreatedLatch = new OneShotLatch();
+               OneShotLatch canRun = new OneShotLatch();
+               private Throwable caughtException;
+
+               @Override
+               public final void run() {
+                       mailboxProcessor = new MailboxProcessor(this);
+                       mailboxCreatedLatch.trigger();
+                       try {
+                               canRun.await();
+                               mailboxProcessor.runMailboxLoop();
+                       } catch (Throwable t) {
+                               this.caughtException = t;
+                       }
+               }
+
+               @Override
+               public void runDefaultAction(DefaultActionContext context) 
throws Exception {
+                       context.allActionsCompleted();
+               }
+
+               final MailboxProcessor getMailboxProcessor() {
+                       try {
+                               mailboxCreatedLatch.await();
+                               return mailboxProcessor;
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               final void signalStart() {
+                       if (mailboxCreatedLatch.isTriggered()) {
+                               canRun.trigger();
+                       }
+               }
+
+               void checkException() throws Exception {
+                       if (caughtException != null) {
+                               throw new Exception(caughtException);
+                       }
+               }
+       }
+
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
new file mode 100644
index 0000000..31126a8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/TestMailboxExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Dummy implementation of {@link MailboxExecutor} for testing.
+ */
+public class TestMailboxExecutor implements MailboxExecutor {
+
+       private final Object lock;
+
+       public TestMailboxExecutor(Object lock) {
+               this.lock = lock;
+       }
+
+       public TestMailboxExecutor() {
+               this(new Object());
+       }
+
+       @Override
+       public void execute(@Nonnull Runnable command) throws 
RejectedExecutionException {
+               synchronized (lock) {
+                       command.run();
+                       lock.notifyAll();
+               }
+       }
+
+       @Override
+       public boolean tryExecute(Runnable command) {
+               execute(command);
+               return true;
+       }
+
+       @Override
+       public void yield() throws InterruptedException {
+               synchronized (lock) {
+                       lock.wait(1);
+               }
+       }
+
+       @Override
+       public boolean tryYield() {
+               return false;
+       }
+
+       @Override
+       public boolean isMailboxThread() {
+               return true;
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 37e7328..00a1641 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -81,7 +82,7 @@ public class MockStreamTask extends StreamTask {
        public void init() { }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
                context.allActionsCompleted();
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index d54ec1f..7dd07db 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Assume;
@@ -286,7 +287,7 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        final long taskIndex = 
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
                        if (taskIndex == 0) {
                                numberOfRestarts.countDown();
@@ -343,7 +344,7 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void performDefaultAction(DefaultActionContext 
context) throws Exception {
                        invokeLatch.countDown();
                        finishLatch.await();
                        context.allActionsCompleted();

Reply via email to