This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad998f8759932d33a19a621e943a09e420ce8828
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Fri May 29 18:10:22 2020 +0200

    [FLINK-16057][task] Optimize ContinuousFileReaderOperator
    
    Current approach of re-enqueuing mails creates an overhead
    visible in benchmarks. This change eliminates unnecessary
    re-enqueueing of mails by checking mailboxExecutor.isIdle.
---
 .../source/ContinuousFileReaderOperator.java       | 25 ++++++++++++----------
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  2 +-
 .../runtime/tasks/mailbox/TaskMailbox.java         | 11 +++++++++-
 4 files changed, 38 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c53049a..87a028c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
@@ -205,7 +206,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
        private transient InputFormat<OUT, ? super T> format;
        private TypeSerializer<OUT> serializer;
-       private transient MailboxExecutor executor;
+       private transient MailboxExecutorImpl executor;
        private transient OUT reusedRecord;
        private transient SourceFunction.SourceContext<OUT> sourceContext;
        private transient ListState<T> checkpointedState;
@@ -233,7 +234,7 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
                this.format = checkNotNull(format);
                this.processingTimeService = 
checkNotNull(processingTimeService);
-               this.executor = checkNotNull(mailboxExecutor);
+               this.executor = (MailboxExecutorImpl) 
checkNotNull(mailboxExecutor);
        }
 
        @Override
@@ -311,17 +312,19 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
        }
 
        private void processRecord() throws IOException {
-               if (!state.prepareToProcessRecord(this)) {
-                       return;
-               }
+               do {
+                       if (!state.prepareToProcessRecord(this)) {
+                               return;
+                       }
 
-               readAndCollectRecord();
+                       readAndCollectRecord();
 
-               if (format.reachedEnd()) {
-                       onSplitProcessed();
-               } else {
-                       enqueueProcessRecord();
-               }
+                       if (format.reachedEnd()) {
+                               onSplitProcessed();
+                               return;
+                       }
+               } while (executor.isIdle()); // todo: consider moving this loop 
into MailboxProcessor (return boolean "re-execute" from enqueued action)
+               enqueueProcessRecord();
        }
 
        private void onSplitProcessed() throws IOException {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index 2f99bec..48b57d8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements 
MailboxExecutor {
 
        private final StreamTaskActionExecutor actionExecutor;
 
+       private final MailboxProcessor mailboxProcessor;
+
        public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
StreamTaskActionExecutor actionExecutor) {
+               this(mailbox, priority, actionExecutor, null);
+       }
+
+       public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
                this.mailbox = mailbox;
                this.priority = priority;
                this.actionExecutor = 
Preconditions.checkNotNull(actionExecutor);
+               this.mailboxProcessor = mailboxProcessor;
+       }
+
+       public boolean isIdle() {
+               return !mailboxProcessor.isMailboxLoopRunning() ||
+                       (mailboxProcessor.isDefaultActionUnavailable() && 
!mailbox.hasMail() && mailbox.getState().isAcceptingMails());
        }
 
        @Override
@@ -85,4 +97,5 @@ public final class MailboxExecutorImpl implements 
MailboxExecutor {
                        return false;
                }
        }
+
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a955c2b..2180971 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -116,7 +116,7 @@ public class MailboxProcessor implements Closeable {
         * @param priority the priority of the {@link MailboxExecutor}.
         */
        public MailboxExecutor getMailboxExecutor(int priority) {
-               return new MailboxExecutorImpl(mailbox, priority, 
actionExecutor);
+               return new MailboxExecutorImpl(mailbox, priority, 
actionExecutor, this);
        }
 
        public void initMetric(TaskMetricGroup metricGroup) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index 5e9fb85..8928525 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -173,7 +173,16 @@ public interface TaskMailbox {
         * This enum represents the states of the mailbox lifecycle.
         */
        enum State {
-               OPEN, QUIESCED, CLOSED
+               OPEN(true), QUIESCED(false), CLOSED(false);
+               private final boolean acceptingMails;
+
+               State(boolean acceptingMails) {
+                       this.acceptingMails = acceptingMails;
+               }
+
+               public boolean isAcceptingMails() {
+                       return acceptingMails;
+               }
        }
 
        /**

Reply via email to