StefanRRichter commented on a change in pull request #8692: [FLINK-12804] 
Introduce mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r293338739
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 ##########
 @@ -20,17 +20,85 @@
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
+
 /**
  * A mailbox is basically a blocking queue for inter-thread message exchange 
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * multiple producer threads and a single consumer. This has a lifecycle of 
closed -> open -> (quiesced) -> closed.
  */
 public interface Mailbox extends MailboxReceiver, MailboxSender {
 
        /**
-        * The effect of this is that all pending letters are dropped and the 
given priorityAction
-        * is enqueued to the head of the mailbox.
+        * This enum represents the states of the mailbox lifecycle.
+        */
+       enum State {
+               OPEN, QUIESCED, CLOSED
+       }
+
+       /**
+        * Open the mailbox. In this state, the mailbox supports put and take 
operations.
+        */
+       void open();
+
+       /**
+        * Quiesce the mailbox. In this state, the mailbox supports only take 
operations and all pending and future put
+        * operations will throw {@link MailboxStateException}.
+        */
+       void quiesce();
+
+       /**
+        * Close the mailbox. In this state, all pending and future put 
operations and all pending and future take
+        * operations will throw {@link MailboxStateException}. Returns all 
letters that were still enqueued.
+        *
+        * @return list with all letters that where enqueued in the mailbox at 
the time of closing.
+        */
+       @Nonnull
+       List<Runnable> close();
+
+       /**
+        * The effect of this is that all pending letters in the mailbox are 
dropped and the given priorityLetter
+        * is enqueued to the head of the mailbox. Dropped letters are 
returned. This method should only be invoked
+        * by code that has ownership of the mailbox object and only rarely 
used, e.g. to submit special events like
+        * shutting down the mailbox loop.
+        *
+        * @param priorityLetter action to enqueue atomically after the mailbox 
was cleared.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
+        */
+       @Nonnull
+       List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
+
+       /**
+        * Adds the given action to the head of the mailbox. This method will 
block if the mailbox is full and
+        * should therefore only be called from outside the mailbox main-thread 
to avoid deadlocks.
+        *
+        * @param priorityLetter action to enqueue to the head of the mailbox.
+        * @throws InterruptedException on interruption.
+        * @throws MailboxStateException if the mailbox is quiesced or closed.
+        */
+       void putFirst(@Nonnull Runnable priorityLetter) throws 
InterruptedException, MailboxStateException;
 
 Review comment:
   I was thinking about that and decided against it for a couple of reasons. 
First, overhead is introduced even for caller that do not care about 
completion, cancelation etc., which seems to be most cases. Second, you can 
always build this on top and also the user-facing fassade of the executor 
service has methods that return `Future`. If you wonder why not 
`CompletableFuture` instead, it is because the user can always create such 
futures themselves via `CompletableFuture.runAsync(Runnable, Executor)`. 
However, there is a downside to this.
   Such calls enqueue a special instance of `RunnableFuture` under the hood, 
but calling cancel on them is never propagated back to the `CompletableFuture`. 
That is just a general shortcoming of this implementation in the JDK in my 
opinion. So, if you want to also propagate cancellation to completable futures, 
that would mean you have to remember all of them, in parallel to the 
`Runnable`s, because they are not `Runnable`. So there are many options already 
possible with this interface that does not impose return values on all clients:
   - no feedback
   - Future
   - compleable future via `runAsync`
   - your own feedback mechanism if you submit instances of `Runnable(Future)`
   
   Given this exhaustive list of options, I would like to keep the base 
interface simple.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to