pnowojski commented on a change in pull request #12401:
URL: https://github.com/apache/flink/pull/12401#discussion_r432370071



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -43,10 +43,22 @@
 
        private final StreamTaskActionExecutor actionExecutor;
 
+       private MailboxProcessor mailboxProcessor;

Review comment:
       `@Nullable` - but check other comments first (for a better solution)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -114,6 +114,7 @@ public MailboxProcessor(
                this.actionExecutor = 
Preconditions.checkNotNull(actionExecutor);
                this.mailbox = Preconditions.checkNotNull(mailbox);
                this.mainMailboxExecutor = 
Preconditions.checkNotNull(mainMailboxExecutor);
+               ((MailboxExecutorImpl) 
this.mainMailboxExecutor).setMailboxProcessor(this);

Review comment:
       Couple of problems:
   1. it changes an object passed from the outside
   2. requires cast (could be solved by changing the signature of the 
constructor)
   
   I'm thinking that maybe a better solution is to get rid of the 
`mainMailboxExecutor` and replace getter `getMainMailboxExecutor` with:
   ```
        public MailboxExecutor getMainMailboxExecutor() {
                return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, 
actionExecutor, this);
        }
   ```
   +
   cache access to `mailboxProcessor.getMainMailboxExecutor()` inside 
`StreamTask` by creating a field in ` StreamTask`: `private final 
MailboxExecutor mainMailboxExecutor`?
   
   This would solve the problem of circular dependency and nullable non final 
field inside `MailboxExecutorImpl`.
   
   WDYT?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -43,10 +43,22 @@
 
        private final StreamTaskActionExecutor actionExecutor;
 
+       private MailboxProcessor mailboxProcessor;
+
        public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
StreamTaskActionExecutor actionExecutor) {
+               this(mailbox, priority, actionExecutor, null);
+       }
+
+       public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
                this.mailbox = mailbox;
                this.priority = priority;
                this.actionExecutor = 
Preconditions.checkNotNull(actionExecutor);
+               this.mailboxProcessor = mailboxProcessor;
+       }
+
+       public boolean isIdle() {
+               return !mailboxProcessor.isMailboxLoopRunning() ||
+                       (mailboxProcessor.isDefaultActionUnavailable() && 
!mailbox.hasMail() && mailbox.getState().isAcceptingMails());

Review comment:
       `mailboxProcessor != null && 
mailboxProcessor.isDefaultActionUnavailable()`?  - but check other comments 
first (for a better solution)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -311,17 +312,19 @@ private void enqueueProcessRecord() {
        }
 
        private void processRecord() throws IOException {
-               if (!state.prepareToProcessRecord(this)) {
-                       return;
-               }
+               do {
+                       if (!state.prepareToProcessRecord(this)) {
+                               return;
+                       }
 
-               readAndCollectRecord();
+                       readAndCollectRecord();
 
-               if (format.reachedEnd()) {
-                       onSplitProcessed();
-               } else {
-                       enqueueProcessRecord();
-               }
+                       if (format.reachedEnd()) {
+                               onSplitProcessed();
+                               return;
+                       }
+               } while (executor.isIdle()); // todo: consider moving this loop 
into MailboxProcessor (return boolean "re-execute" from enqueued action)

Review comment:
       nit: I would be slightly in favour of adding this method to the 
`MailboxExecutor` interface, but if you think opposite let it be for now.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
##########
@@ -50,7 +50,7 @@ public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int 
priority, StreamTas
        }
 
        public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
-               this.mailbox = mailbox;
+               this.mailbox = (TaskMailboxImpl) mailbox;

Review comment:
       revert this change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to