This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad998f8759932d33a19a621e943a09e420ce8828 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Fri May 29 18:10:22 2020 +0200 [FLINK-16057][task] Optimize ContinuousFileReaderOperator Current approach of re-enqueuing mails creates an overhead visible in benchmarks. This change eliminates unnecessary re-enqueueing of mails by checking mailboxExecutor.isIdle. --- .../source/ContinuousFileReaderOperator.java | 25 ++++++++++++---------- .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 13 +++++++++++ .../runtime/tasks/mailbox/MailboxProcessor.java | 2 +- .../runtime/tasks/mailbox/TaskMailbox.java | 11 +++++++++- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index c53049a..87a028c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; @@ -205,7 +206,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> private transient InputFormat<OUT, ? super T> format; private TypeSerializer<OUT> serializer; - private transient MailboxExecutor executor; + private transient MailboxExecutorImpl executor; private transient OUT reusedRecord; private transient SourceFunction.SourceContext<OUT> sourceContext; private transient ListState<T> checkpointedState; @@ -233,7 +234,7 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> this.format = checkNotNull(format); this.processingTimeService = checkNotNull(processingTimeService); - this.executor = checkNotNull(mailboxExecutor); + this.executor = (MailboxExecutorImpl) checkNotNull(mailboxExecutor); } @Override @@ -311,17 +312,19 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> } private void processRecord() throws IOException { - if (!state.prepareToProcessRecord(this)) { - return; - } + do { + if (!state.prepareToProcessRecord(this)) { + return; + } - readAndCollectRecord(); + readAndCollectRecord(); - if (format.reachedEnd()) { - onSplitProcessed(); - } else { - enqueueProcessRecord(); - } + if (format.reachedEnd()) { + onSplitProcessed(); + return; + } + } while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action) + enqueueProcessRecord(); } private void onSplitProcessed() throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java index 2f99bec..48b57d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java @@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements MailboxExecutor { private final StreamTaskActionExecutor actionExecutor; + private final MailboxProcessor mailboxProcessor; + public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) { + this(mailbox, priority, actionExecutor, null); + } + + public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) { this.mailbox = mailbox; this.priority = priority; this.actionExecutor = Preconditions.checkNotNull(actionExecutor); + this.mailboxProcessor = mailboxProcessor; + } + + public boolean isIdle() { + return !mailboxProcessor.isMailboxLoopRunning() || + (mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails()); } @Override @@ -85,4 +97,5 @@ public final class MailboxExecutorImpl implements MailboxExecutor { return false; } } + } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java index a955c2b..2180971 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java @@ -116,7 +116,7 @@ public class MailboxProcessor implements Closeable { * @param priority the priority of the {@link MailboxExecutor}. */ public MailboxExecutor getMailboxExecutor(int priority) { - return new MailboxExecutorImpl(mailbox, priority, actionExecutor); + return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this); } public void initMetric(TaskMetricGroup metricGroup) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java index 5e9fb85..8928525 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java @@ -173,7 +173,16 @@ public interface TaskMailbox { * This enum represents the states of the mailbox lifecycle. */ enum State { - OPEN, QUIESCED, CLOSED + OPEN(true), QUIESCED(false), CLOSED(false); + private final boolean acceptingMails; + + State(boolean acceptingMails) { + this.acceptingMails = acceptingMails; + } + + public boolean isAcceptingMails() { + return acceptingMails; + } } /**