This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04b5cbf07775c086b1df33f94b77a99acc3f4615 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Jun 21 11:57:24 2019 +0200 [FLINK-12479][operators] Integrate StreamInputProcessor with mailbox --- .../streaming/runtime/io/StreamInputProcessor.java | 7 +- .../runtime/io/StreamOneInputProcessor.java | 43 ++++--- .../runtime/io/StreamTwoInputProcessor.java | 21 +++- .../io/StreamTwoInputSelectableProcessor.java | 125 ++++++++++----------- .../flink/streaming/runtime/tasks/StreamTask.java | 9 +- .../tasks/StreamTaskSelectiveReadingTest.java | 10 +- 6 files changed, 122 insertions(+), 93 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 0b263d0..1de31bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.AvailabilityListener; import java.io.Closeable; @@ -26,9 +27,11 @@ import java.io.Closeable; * Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ @Internal -public interface StreamInputProcessor extends Closeable { +public interface StreamInputProcessor extends AvailabilityListener, Closeable { /** - * @return true if {@link StreamTaskInput} is finished. + * @return true if {@link StreamInputProcessor} estimates that more records can be processed + * immediately. Otherwise false, which means that there are no more records available at the + * moment and the caller should check {@link #isFinished()} and/or {@link #isAvailable()}. */ boolean processInput() throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java index 4be6d54..3be72ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -129,29 +130,29 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor { } @Override + public boolean isFinished() { + return input.isFinished(); + } + + @Override + public CompletableFuture<?> isAvailable() { + return input.isAvailable(); + } + + @Override public boolean processInput() throws Exception { initializeNumRecordsIn(); StreamElement recordOrMark = input.pollNextNullable(); - if (recordOrMark == null) { - input.isAvailable().get(); - return !checkFinished(); - } - int channel = input.getLastChannel(); - checkState(channel != StreamTaskInput.UNSPECIFIED); + if (recordOrMark != null) { + int channel = input.getLastChannel(); + checkState(channel != StreamTaskInput.UNSPECIFIED); - processElement(recordOrMark, channel); - return true; - } - - private boolean checkFinished() throws Exception { - boolean isFinished = input.isFinished(); - if (isFinished) { - synchronized (lock) { - operatorChain.endInput(1); - } + processElement(recordOrMark, channel); } - return isFinished; + checkFinished(); + + return recordOrMark != null; } private void processElement(StreamElement recordOrMark, int channel) throws Exception { @@ -180,6 +181,14 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor { } } + private void checkFinished() throws Exception { + if (input.isFinished()) { + synchronized (lock) { + operatorChain.endInput(1); + } + } + } + private void initializeNumRecordsIn() { if (numRecordsIn == null) { try { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 00bdab9..c888477 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.BitSet; import java.util.Collection; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -207,6 +208,19 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce } @Override + public boolean isFinished() { + return isFinished; + } + + @Override + public CompletableFuture<?> isAvailable() { + if (currentRecordDeserializer != null) { + return AVAILABLE; + } + return barrierHandler.isAvailable(); + } + + @Override public boolean processInput() throws Exception { if (isFinished) { return false; @@ -259,7 +273,6 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce streamOperator.processElement1(record); } return true; - } } else { @@ -295,15 +308,13 @@ public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProce if (bufferOrEvent.isPresent()) { processBufferOrEvent(bufferOrEvent.get()); } else { - if (!barrierHandler.isFinished()) { - barrierHandler.isAvailable().get(); - } else { + if (barrierHandler.isFinished()) { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } - return false; } + return false; } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java index 65464db..4adc2dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java @@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -156,7 +155,21 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream this.lastReadInputIndex = 1; // always try to read from the first input this.isPrepared = false; + } + @Override + public boolean isFinished() { + return input1.isFinished() && input2.isFinished(); + } + + @Override + public CompletableFuture<?> isAvailable() { + if (inputSelection.isALLMaskOf2()) { + return isAnyInputAvailable(); + } else { + StreamTaskInput input = (inputSelection.getInputMask() == InputSelection.FIRST.getInputMask()) ? input1 : input2; + return input.isAvailable(); + } } @Override @@ -179,18 +192,29 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream if (recordOrMark != null) { processElement1(recordOrMark, input1.getLastChannel()); } + checkFinished(input1, lastReadInputIndex); } else { recordOrMark = input2.pollNextNullable(); if (recordOrMark != null) { processElement2(recordOrMark, input2.getLastChannel()); } + checkFinished(input2, lastReadInputIndex); } if (recordOrMark == null) { setUnavailableInput(readingInputIndex); } - return !checkFinished(); + return recordOrMark != null; + } + + private void checkFinished(StreamTaskInput input, int inputIndex) throws Exception { + if (input.isFinished()) { + synchronized (lock) { + operatorChain.endInput(getInputId(inputIndex)); + inputSelection = inputSelector.nextSelection(); + } + } } @Override @@ -213,14 +237,12 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream } } - private int selectNextReadingInputIndex() - throws InterruptedException, ExecutionException, IOException { - - int readingInputIndex; - while ((readingInputIndex = inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, lastReadInputIndex)) == -1) { - if (!waitForAvailableInput(inputSelection)) { - return -1; - } + private int selectNextReadingInputIndex() throws IOException { + updateAvailability(); + checkInputSelectionAgainstIsFinished(); + int readingInputIndex = inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, lastReadInputIndex); + if (readingInputIndex == -1) { + return -1; } // to avoid starvation, if the input selection is ALL and availableInputsMask is not ALL, @@ -234,6 +256,27 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream return readingInputIndex; } + private void checkInputSelectionAgainstIsFinished() throws IOException { + if (inputSelection.isALLMaskOf2()) { + return; + } + if (inputSelection.isInputSelected(1) && input1.isFinished()) { + throw new IOException("Can not make a progress: only first input is selected but it is already finished"); + } + if (inputSelection.isInputSelected(2) && input2.isFinished()) { + throw new IOException("Can not make a progress: only second input is selected but it is already finished"); + } + } + + private void updateAvailability() { + if (!input1.isFinished() && input1.isAvailable() == AVAILABLE) { + setAvailableInput(input1.getInputIndex()); + } + if (!input2.isFinished() && input2.isAvailable() == AVAILABLE) { + setAvailableInput(input2.getInputIndex()); + } + } + private void processElement1(StreamElement recordOrMark, int channel) throws Exception { if (recordOrMark.isRecord()) { StreamRecord<IN1> record = recordOrMark.asRecord(); @@ -304,64 +347,20 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream } } - /** - * @return false if both of the inputs are finished, true otherwise. - */ - private boolean waitForAvailableInput(InputSelection inputSelection) - throws ExecutionException, InterruptedException, IOException { - - if (inputSelection.isALLMaskOf2()) { - return waitForAvailableEitherInput(); - } else { - waitForOneInput( - (inputSelection.getInputMask() == InputSelection.FIRST.getInputMask()) ? input1 : input2); - return true; + private CompletableFuture<?> isAnyInputAvailable() { + if (input1.isFinished()) { + return input2.isFinished() ? AVAILABLE : input2.isAvailable(); } - } - - private boolean waitForAvailableEitherInput() - throws ExecutionException, InterruptedException { - CompletableFuture<?> future1 = input1.isFinished() ? UNAVAILABLE : input1.isAvailable(); - CompletableFuture<?> future2 = input2.isFinished() ? UNAVAILABLE : input2.isAvailable(); - - if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) { - return false; + if (input2.isFinished()) { + return input1.isAvailable(); } - // block to wait for a available input - CompletableFuture.anyOf(future1, future2).get(); - - if (future1.isDone()) { - setAvailableInput(input1.getInputIndex()); - } - if (future2.isDone()) { - setAvailableInput(input2.getInputIndex()); - } + CompletableFuture<?> input1Available = input1.isAvailable(); + CompletableFuture<?> input2Available = input2.isAvailable(); - return true; - } - - private void waitForOneInput(StreamTaskInput input) - throws IOException, ExecutionException, InterruptedException { - - if (input.isFinished()) { - throw new IOException("Could not read the finished input: input" + (input.getInputIndex() + 1) + "."); - } - - input.isAvailable().get(); - setAvailableInput(input.getInputIndex()); - } - - private boolean checkFinished() throws Exception { - if (getInput(lastReadInputIndex).isFinished()) { - synchronized (lock) { - operatorChain.endInput(getInputId(lastReadInputIndex)); - inputSelection = inputSelector.nextSelection(); - } - } - - return input1.isFinished() && input2.isFinished(); + return (input1Available == AVAILABLE || input2Available == AVAILABLE) ? + AVAILABLE : CompletableFuture.anyOf(input1Available, input2Available); } private void setAvailableInput(int inputIndex) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7960a2f..53142a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -61,6 +61,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext; import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor; +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.SuspendedMailboxDefaultAction; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -267,7 +268,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> */ protected void performDefaultAction(DefaultActionContext context) throws Exception { if (!inputProcessor.processInput()) { - context.allActionsCompleted(); + if (inputProcessor.isFinished()) { + context.allActionsCompleted(); + } + else { + SuspendedMailboxDefaultAction suspendedDefaultAction = context.suspendDefaultAction(); + inputProcessor.isAvailable().thenRun(suspendedDefaultAction::resume); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index c7c1440..16e5bd1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -104,13 +103,14 @@ public class StreamTaskSelectiveReadingTest { } @Test - public void testReadFinishedInput() { + public void testReadFinishedInput() throws Exception { try { testBase(new TestReadFinishedInputStreamOperator(), false, new ConcurrentLinkedQueue<>(), true); fail("should throw an IOException"); - } catch (Throwable t) { - assertTrue("wrong exception, should be IOException", - ExceptionUtils.findThrowableWithMessage(t, "Could not read the finished input: input1").isPresent()); + } catch (Exception t) { + if (!ExceptionUtils.findThrowableWithMessage(t, "only first input is selected but it is already finished").isPresent()) { + throw t; + } } }
