StefanRRichter commented on a change in pull request #8692: [FLINK-12804]
Introduce mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r293338739
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
##########
@@ -20,17 +20,85 @@
import javax.annotation.Nonnull;
+import java.util.List;
+
/**
* A mailbox is basically a blocking queue for inter-thread message exchange
in form of {@link Runnable} objects between
- * multiple producer threads and a single consumer.
+ * multiple producer threads and a single consumer. This has a lifecycle of
closed -> open -> (quiesced) -> closed.
*/
public interface Mailbox extends MailboxReceiver, MailboxSender {
/**
- * The effect of this is that all pending letters are dropped and the
given priorityAction
- * is enqueued to the head of the mailbox.
+ * This enum represents the states of the mailbox lifecycle.
+ */
+ enum State {
+ OPEN, QUIESCED, CLOSED
+ }
+
+ /**
+ * Open the mailbox. In this state, the mailbox supports put and take
operations.
+ */
+ void open();
+
+ /**
+ * Quiesce the mailbox. In this state, the mailbox supports only take
operations and all pending and future put
+ * operations will throw {@link MailboxStateException}.
+ */
+ void quiesce();
+
+ /**
+ * Close the mailbox. In this state, all pending and future put
operations and all pending and future take
+ * operations will throw {@link MailboxStateException}. Returns all
letters that were still enqueued.
+ *
+ * @return list with all letters that where enqueued in the mailbox at
the time of closing.
+ */
+ @Nonnull
+ List<Runnable> close();
+
+ /**
+ * The effect of this is that all pending letters in the mailbox are
dropped and the given priorityLetter
+ * is enqueued to the head of the mailbox. Dropped letters are
returned. This method should only be invoked
+ * by code that has ownership of the mailbox object and only rarely
used, e.g. to submit special events like
+ * shutting down the mailbox loop.
+ *
+ * @param priorityLetter action to enqueue atomically after the mailbox
was cleared.
+ * @throws MailboxStateException if the mailbox is quiesced or closed.
+ */
+ @Nonnull
+ List<Runnable> clearAndPut(@Nonnull Runnable priorityLetter) throws
MailboxStateException;
+
+ /**
+ * Adds the given action to the head of the mailbox. This method will
block if the mailbox is full and
+ * should therefore only be called from outside the mailbox main-thread
to avoid deadlocks.
+ *
+ * @param priorityLetter action to enqueue to the head of the mailbox.
+ * @throws InterruptedException on interruption.
+ * @throws MailboxStateException if the mailbox is quiesced or closed.
+ */
+ void putFirst(@Nonnull Runnable priorityLetter) throws
InterruptedException, MailboxStateException;
Review comment:
I was thinking about that and decided against it for a couple of reasons.
First, overhead is introduced even for caller that do not care about
completion, cancelation etc., which seems to be most cases. Second, you can
always build this on top and also the user-facing fassade of the executor
service has methods that return `Future`. If you wonder why not
`CompletableFuture` instead, it is because the user can always create such
futures themselves via `CompletableFuture.runAsync(Runnable, Executor)`.
However, there is a downside to this.
Such calls enqueue a special instance of `RunnableFuture` under the hood,
but calling cancel on them is never propagated back to the `CompletableFuture`.
That is just a general shortcoming of this implementation in the JDK in my
opinion. So, if you want to also propagate cancellation to completable futures,
that would mean you have to remember all of them, in parallel to the
`Runnable`s, because they are not `Runnable`. So there are many options already
possible with this interface that does not impose return values on all clients:
- no feedback
- Future
- compleable future via `runAsync`
- your own feedback mechanism if you submit instances of `Runnable(Future)`
Given this exhaustive list of options, I would like to keep the base
interface simple.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services