This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 48b54e3e40d91b4a79a56f327210e3e8e36557e9 Author: sunhaibotb <[email protected]> AuthorDate: Wed Jun 12 18:14:21 2019 +0800 [FLINK-11877][runtime] Implement the runtime handling of the InputSelectable interface --- .../flink/streaming/api/graph/StreamGraph.java | 6 +- .../api/operators/SimpleOperatorFactory.java | 5 + .../api/operators/StreamOperatorFactory.java | 5 + .../streaming/runtime/io/StreamInputProcessor.java | 2 +- .../streaming/runtime/io/StreamTaskInput.java | 5 + .../runtime/io/StreamTaskNetworkInput.java | 12 +- .../io/StreamTwoInputSelectableProcessor.java | 429 +++++++++++++++++++++ .../tasks/TwoInputSelectableStreamTask.java | 78 ++++ .../tasks/StreamTaskSelectiveReadingTest.java | 369 ++++++++++++++++++ .../runtime/tasks/TwoInputStreamTaskTest.java | 70 +++- .../tasks/TwoInputStreamTaskTestHarness.java | 10 +- .../flink/table/generated/GeneratedClass.java | 4 + .../table/runtime/CodeGenOperatorFactory.java | 6 + .../runtime/StreamTaskSelectiveReadingITCase.java | 171 ++++++++ 14 files changed, 1154 insertions(+), 18 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 34ceaf1..19dd21f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; +import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.apache.flink.util.OutputTag; @@ -225,7 +226,10 @@ public class StreamGraph extends StreamingPlan { TypeInformation<OUT> outTypeInfo, String operatorName) { - addNode(vertexID, slotSharingGroup, coLocationGroup, TwoInputStreamTask.class, taskOperatorFactory, operatorName); + Class<? extends AbstractInvokable> vertexClass = taskOperatorFactory.isOperatorSelectiveReading() ? + TwoInputSelectableStreamTask.class : TwoInputStreamTask.class; + + addNode(vertexID, slotSharingGroup, coLocationGroup, vertexClass, taskOperatorFactory, operatorName); TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java index 4bf3124..8b2ea06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java @@ -88,6 +88,11 @@ public class SimpleOperatorFactory<OUT> implements StreamOperatorFactory<OUT> { } @Override + public boolean isOperatorSelectiveReading() { + return operator instanceof InputSelectable; + } + + @Override public boolean isOutputTypeConfigurable() { return operator instanceof OutputTypeConfigurable; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java index 31d2684..9a493f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java @@ -59,6 +59,11 @@ public interface StreamOperatorFactory<OUT> extends Serializable { } /** + * Test whether the operator is selective reading one. + */ + boolean isOperatorSelectiveReading(); + + /** * If the stream operator need access to the output type information at {@link StreamGraph} * generation. This can be useful for cases where the output type is specified by the returns * method and, thus, after the stream operator has been created. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index faa14b5..203f633 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -107,7 +107,7 @@ public class StreamInputProcessor<IN> { inputGate, taskManagerConfig, taskName); - this.input = new StreamTaskNetworkInput(barrierHandler, inputSerializer, ioManager); + this.input = new StreamTaskNetworkInput(barrierHandler, inputSerializer, ioManager, 0); this.lock = checkNotNull(lock); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java index ea1d2c3..19fd765 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java @@ -35,4 +35,9 @@ public interface StreamTaskInput extends NullableAsyncDataInput<StreamElement>, * it is unspecified. */ int getLastChannel(); + + /** + * Returns the input index of this input. + */ + int getInputIndex(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index c82d91e..85e7f46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -53,6 +53,8 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers; + private final int inputIndex; + private int lastChannel = UNSPECIFIED; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null; @@ -63,7 +65,8 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { public StreamTaskNetworkInput( CheckpointBarrierHandler barrierHandler, TypeSerializer<?> inputSerializer, - IOManager ioManager) { + IOManager ioManager, + int inputIndex) { this.barrierHandler = barrierHandler; this.deserializationDelegate = new NonReusingDeserializationDelegate<>( new StreamElementSerializer<>(inputSerializer)); @@ -74,6 +77,8 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( ioManager.getSpillingDirectoriesPaths()); } + + this.inputIndex = inputIndex; } @Override @@ -132,6 +137,11 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { } @Override + public int getInputIndex() { + return inputIndex; + } + + @Override public boolean isFinished() { return isFinished; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java new file mode 100644 index 0000000..7374f97 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask} + * in the case that the operator is InputSelectable. + * + * @param <IN1> The type of the records that arrive on the first input + * @param <IN2> The type of the records that arrive on the second input + */ +@Internal +public class StreamTwoInputSelectableProcessor<IN1, IN2> { + + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class); + + private static final CompletableFuture<?> UNAVAILABLE = new CompletableFuture<>(); + + private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator; + private final InputSelectable inputSelector; + + private final Object lock; + + private final StreamTaskInput input1; + private final StreamTaskInput input2; + + /** + * Valves that control how watermarks and stream statuses from the 2 inputs are forwarded. + */ + private final StatusWatermarkValve statusWatermarkValve1; + private final StatusWatermarkValve statusWatermarkValve2; + + /** + * Stream status for the two inputs. We need to keep track for determining when + * to forward stream status changes downstream. + */ + private StreamStatus firstStatus; + private StreamStatus secondStatus; + + private int availableInputsMask; + + private int lastReadInputIndex; + + private InputSelection inputSelection; + + private Counter numRecordsIn; + + private boolean isPrepared; + + public StreamTwoInputSelectableProcessor( + Collection<InputGate> inputGates1, + Collection<InputGate> inputGates2, + TypeSerializer<IN1> inputSerializer1, + TypeSerializer<IN2> inputSerializer2, + Object lock, + IOManager ioManager, + StreamStatusMaintainer streamStatusMaintainer, + TwoInputStreamOperator<IN1, IN2, ?> streamOperator, + WatermarkGauge input1WatermarkGauge, + WatermarkGauge input2WatermarkGauge) { + + checkState(streamOperator instanceof InputSelectable); + + this.streamOperator = checkNotNull(streamOperator); + this.inputSelector = (InputSelectable) streamOperator; + + this.lock = checkNotNull(lock); + + InputGate unionedInputGate1 = InputGateUtil.createInputGate(inputGates1.toArray(new InputGate[0])); + InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0])); + + // create a Input instance for each input + this.input1 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0); + this.input2 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1); + + this.statusWatermarkValve1 = new StatusWatermarkValve( + unionedInputGate1.getNumberOfInputChannels(), + new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input1WatermarkGauge, 0)); + this.statusWatermarkValve2 = new StatusWatermarkValve( + unionedInputGate2.getNumberOfInputChannels(), + new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1)); + + this.firstStatus = StreamStatus.ACTIVE; + this.secondStatus = StreamStatus.ACTIVE; + + this.availableInputsMask = (int) new InputSelection.Builder().select(1).select(2).build().getInputMask(); + + this.lastReadInputIndex = 1; // always try to read from the first input + + this.isPrepared = false; + + } + + public boolean processInput() throws Exception { + if (!isPrepared) { + // the preparations here are not placed in the constructor because all work in it + // must be executed after all operators are opened. + prepareForProcessing(); + } + + int readingInputIndex = selectNextReadingInputIndex(); + if (readingInputIndex == -1) { + return false; + } + lastReadInputIndex = readingInputIndex; + + StreamElement recordOrMark; + if (readingInputIndex == 0) { + recordOrMark = input1.pollNextNullable(); + if (recordOrMark != null) { + processElement1(recordOrMark, input1.getLastChannel()); + } + } else { + recordOrMark = input2.pollNextNullable(); + if (recordOrMark != null) { + processElement2(recordOrMark, input2.getLastChannel()); + } + } + + if (recordOrMark == null) { + setUnavailableInput(readingInputIndex); + } + + return !checkFinished(); + } + + public void cleanup() throws Exception { + Exception ex = null; + try { + input1.close(); + } catch (Exception e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + try { + input2.close(); + } catch (Exception e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + if (ex != null) { + throw ex; + } + } + + private int selectNextReadingInputIndex() + throws InterruptedException, ExecutionException, IOException { + + int readingInputIndex; + while ((readingInputIndex = inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, lastReadInputIndex)) == -1) { + if (!waitForAvailableInput(inputSelection)) { + return -1; + } + } + + // to avoid starvation, if the input selection is ALL and availableInputsMask is not ALL, + // always try to check and set the availability of another input + // TODO: because this can be a costly operation (checking volatile inside CompletableFuture` + // this might be optimized to only check once per processed NetworkBuffer + if (availableInputsMask < 3 && inputSelection.isALLMaskOf2()) { + checkAndSetAvailable(1 - readingInputIndex); + } + + return readingInputIndex; + } + + private void processElement1(StreamElement recordOrMark, int channel) throws Exception { + if (recordOrMark.isRecord()) { + StreamRecord<IN1> record = recordOrMark.asRecord(); + synchronized (lock) { + numRecordsIn.inc(); + streamOperator.setKeyContextElement1(record); + streamOperator.processElement1(record); + inputSelection = inputSelector.nextSelection(); + } + } + else if (recordOrMark.isWatermark()) { + statusWatermarkValve1.inputWatermark(recordOrMark.asWatermark(), channel); + } else if (recordOrMark.isStreamStatus()) { + statusWatermarkValve1.inputStreamStatus(recordOrMark.asStreamStatus(), channel); + } else if (recordOrMark.isLatencyMarker()) { + synchronized (lock) { + streamOperator.processLatencyMarker1(recordOrMark.asLatencyMarker()); + } + } else { + throw new UnsupportedOperationException("Unknown type of StreamElement on input1"); + } + } + + private void processElement2(StreamElement recordOrMark, int channel) throws Exception { + if (recordOrMark.isRecord()) { + StreamRecord<IN2> record = recordOrMark.asRecord(); + synchronized (lock) { + numRecordsIn.inc(); + streamOperator.setKeyContextElement2(record); + streamOperator.processElement2(record); + inputSelection = inputSelector.nextSelection(); + } + } + else if (recordOrMark.isWatermark()) { + statusWatermarkValve2.inputWatermark(recordOrMark.asWatermark(), channel); + } else if (recordOrMark.isStreamStatus()) { + statusWatermarkValve2.inputStreamStatus(recordOrMark.asStreamStatus(), channel); + } else if (recordOrMark.isLatencyMarker()) { + synchronized (lock) { + streamOperator.processLatencyMarker2(recordOrMark.asLatencyMarker()); + } + } else { + throw new UnsupportedOperationException("Unknown type of StreamElement on input2"); + } + } + + private void prepareForProcessing() { + // Note: the first call to nextSelection () on the operator must be made after this operator + // is opened to ensure that any changes about the input selection in its open() + // method take effect. + inputSelection = inputSelector.nextSelection(); + + try { + numRecordsIn = ((OperatorMetricGroup) streamOperator + .getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + numRecordsIn = new SimpleCounter(); + } + + isPrepared = true; + } + + private void checkAndSetAvailable(int inputIndex) { + StreamTaskInput input = getInput(inputIndex); + if (!input.isFinished() && input.isAvailable().isDone()) { + setAvailableInput(inputIndex); + } + } + + /** + * @return false if both of the inputs are finished, true otherwise. + */ + private boolean waitForAvailableInput(InputSelection inputSelection) + throws ExecutionException, InterruptedException, IOException { + + if (inputSelection.isALLMaskOf2()) { + return waitForAvailableEitherInput(); + } else { + waitForOneInput( + (inputSelection.getInputMask() == InputSelection.FIRST.getInputMask()) ? input1 : input2); + return true; + } + } + + private boolean waitForAvailableEitherInput() + throws ExecutionException, InterruptedException { + + CompletableFuture<?> future1 = input1.isFinished() ? UNAVAILABLE : input1.isAvailable(); + CompletableFuture<?> future2 = input2.isFinished() ? UNAVAILABLE : input2.isAvailable(); + + if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) { + return false; + } + + // block to wait for a available input + CompletableFuture.anyOf(future1, future2).get(); + + if (future1.isDone()) { + setAvailableInput(input1.getInputIndex()); + } + if (future2.isDone()) { + setAvailableInput(input2.getInputIndex()); + } + + return true; + } + + private void waitForOneInput(StreamTaskInput input) + throws IOException, ExecutionException, InterruptedException { + + if (input.isFinished()) { + throw new IOException("Could not read the finished input: input" + (input.getInputIndex() + 1) + "."); + } + + input.isAvailable().get(); + setAvailableInput(input.getInputIndex()); + } + + private boolean checkFinished() { + if (getInput(lastReadInputIndex).isFinished()) { + inputSelection = (lastReadInputIndex == 0) ? InputSelection.SECOND : InputSelection.FIRST; + // TODO: adds the runtime handling of the BoundedMultiInput interface; + } + + return input1.isFinished() && input2.isFinished(); + } + + private void setAvailableInput(int inputIndex) { + availableInputsMask |= 1 << inputIndex; + } + + private void setUnavailableInput(int inputIndex) { + availableInputsMask &= ~(1 << inputIndex); + } + + private StreamTaskInput getInput(int inputIndex) { + return inputIndex == 0 ? input1 : input2; + } + + private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler { + + private final TwoInputStreamOperator<IN1, IN2, ?> operator; + + private final Object lock; + + private final StreamStatusMaintainer streamStatusMaintainer; + + private final WatermarkGauge inputWatermarkGauge; + + private final int inputIndex; + + private ForwardingValveOutputHandler( + TwoInputStreamOperator<IN1, IN2, ?> operator, + Object lock, + StreamStatusMaintainer streamStatusMaintainer, + WatermarkGauge inputWatermarkGauge, + int inputIndex) { + + this.operator = checkNotNull(operator); + this.lock = checkNotNull(lock); + + this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer); + + this.inputWatermarkGauge = inputWatermarkGauge; + + this.inputIndex = inputIndex; + } + + @Override + public void handleWatermark(Watermark watermark) { + try { + synchronized (lock) { + inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp()); + if (inputIndex == 0) { + operator.processWatermark1(watermark); + } else { + operator.processWatermark2(watermark); + } + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output watermark of input" + + (inputIndex + 1) + ": ", e); + } + } + + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + try { + synchronized (lock) { + final StreamStatus anotherStreamStatus; + if (inputIndex == 0) { + firstStatus = streamStatus; + anotherStreamStatus = secondStatus; + } else { + secondStatus = streamStatus; + anotherStreamStatus = firstStatus; + } + + // check if we need to toggle the task's stream status + if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) { + if (streamStatus.isActive()) { + // we're no longer idle if at least one input has become active + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); + } else if (anotherStreamStatus.isIdle()) { + // we're idle once both inputs are idle + streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE); + } + } + } + } catch (Exception e) { + throw new RuntimeException("Exception occurred while processing valve output stream status of input" + + (inputIndex + 1) + ": ", e); + } + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java new file mode 100644 index 0000000..33e1f50 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor; + +import java.util.Collection; + +/** + * A {@link StreamTask} for executing a {@link TwoInputStreamOperator} and supporting + * the {@link TwoInputStreamOperator} to select input for reading. + */ +@Internal +public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> { + + private StreamTwoInputSelectableProcessor<IN1, IN2> inputProcessor; + + public TwoInputSelectableStreamTask(Environment env) { + super(env); + } + + @Override + protected void createInputProcessor( + Collection<InputGate> inputGates1, + Collection<InputGate> inputGates2, + TypeSerializer<IN1> inputDeserializer1, + TypeSerializer<IN2> inputDeserializer2) { + + this.inputProcessor = new StreamTwoInputSelectableProcessor<>( + inputGates1, inputGates2, + inputDeserializer1, inputDeserializer2, + this, + getEnvironment().getIOManager(), + getStreamStatusMaintainer(), + this.headOperator, + input1WatermarkGauge, + input2WatermarkGauge); + } + + @Override + protected void performDefaultAction(ActionContext context) throws Exception { + if (!inputProcessor.processInput()) { + context.allActionsCompleted(); + } + } + + @Override + protected void cleanup() throws Exception { + if (inputProcessor != null) { + inputProcessor.cleanup(); + } + } + + @Override + protected void cancelTask() { + + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java new file mode 100644 index 0000000..a56dc9b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test selective reading. + */ +public class StreamTaskSelectiveReadingTest { + + @Test + public void testAnyOrderedReading() throws Exception { + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-2")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4")); + + testBase(new AnyReadingStreamOperator("Operator0"), true, expectedOutput, true); + } + + @Test + public void testAnyUnorderedReading() throws Exception { + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-2")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4")); + + testBase(new AnyReadingStreamOperator("Operator0"), false, expectedOutput, false); + } + + @Test + public void testSequentialReading() throws Exception { + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-2")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4")); + + testBase(new SequentialReadingStreamOperator("Operator0"), false, expectedOutput, true); + } + + @Test + public void testSpecialRuleReading() throws Exception { + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-2")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2")); + expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3")); + expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4")); + + testBase(new SpecialRuleReadingStreamOperator("Operator0", 3, 4, 2), false, expectedOutput, true); + } + + @Test + public void testReadFinishedInput() { + try { + testBase(new TestReadFinishedInputStreamOperator(), false, new ConcurrentLinkedQueue<>(), true); + fail("should throw an IOException"); + } catch (Throwable t) { + assertTrue("wrong exception, should be IOException", + ExceptionUtils.findThrowableWithMessage(t, "Could not read the finished input: input1").isPresent()); + } + } + + private void testBase( + TwoInputStreamOperator<String, Integer, String> streamOperator, + boolean prepareDataBeforeProcessing, + ConcurrentLinkedQueue<Object> expectedOutput, + boolean orderedCheck) throws Exception { + + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>( + TestSelectiveReadingTask::new, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setupOutputForSingletonOperatorChain(); + StreamConfig streamConfig = testHarness.getStreamConfig(); + streamConfig.setStreamOperator(streamOperator); + streamConfig.setOperatorID(new OperatorID()); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + boolean isProcessing = false; + if (!prepareDataBeforeProcessing) { + ((TestSelectiveReadingTask) testHarness.getTask()).startProcessing(); + isProcessing = true; + } + + testHarness.processElement(new StreamRecord<>("Hello-1"), 0, 0); + + // wait until the input is processed to test the listening and blocking logic + if (!prepareDataBeforeProcessing) { + testHarness.waitForInputProcessing(); + } + + testHarness.processElement(new StreamRecord<>("Hello-2"), 0, 0); + testHarness.processElement(new StreamRecord<>("Hello-3"), 0, 0); + + testHarness.processElement(new StreamRecord<>(1), 1, 0); + testHarness.processElement(new StreamRecord<>(2), 1, 0); + testHarness.processElement(new StreamRecord<>(3), 1, 0); + testHarness.processElement(new StreamRecord<>(4), 1, 0); + + testHarness.endInput(); + + if (!isProcessing) { + ((TestSelectiveReadingTask) testHarness.getTask()).startProcessing(); + } + testHarness.waitForTaskCompletion(10_000L); + + LinkedBlockingQueue<Object> output = testHarness.getOutput(); + if (orderedCheck) { + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, output); + } else { + String[] expectedResult = expectedOutput.stream() + .map(record -> ((StreamRecord) record).getValue().toString()) + .toArray(String[]::new); + Arrays.sort(expectedResult); + + String[] result = output.stream() + .map(record -> ((StreamRecord) record).getValue().toString()) + .toArray(String[]::new); + Arrays.sort(result); + + assertArrayEquals("Output was not correct.", expectedResult, result); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static class TestSelectiveReadingTask<IN1, IN2, OUT> extends TwoInputSelectableStreamTask<IN1, IN2, OUT> { + + private volatile boolean started; + + TestSelectiveReadingTask(Environment env) { + super(env); + started = false; + } + + @Override + protected void performDefaultAction(ActionContext context) throws Exception { + if (!started) { + synchronized (this) { + this.wait(); + } + } + + super.performDefaultAction(context); + } + + public void startProcessing() { + started = true; + synchronized (this) { + this.notifyAll(); + } + } + } + + private static class AnyReadingStreamOperator extends AbstractStreamOperator<String> + implements TwoInputStreamOperator<String, Integer, String>, InputSelectable { + + private final String name; + + AnyReadingStreamOperator(String name) { + super(); + + this.name = name; + } + + @Override + public InputSelection nextSelection() { + return InputSelection.ALL; + } + + @Override + public void processElement1(StreamRecord<String> element) { + output.collect(element.replace("[" + name + "-1]: " + element.getValue())); + } + + @Override + public void processElement2(StreamRecord<Integer> element) { + output.collect(element.replace("[" + name + "-2]: " + element.getValue())); + } + } + + /** + * Test operator for sequential reading. + */ + public static class SequentialReadingStreamOperator extends AbstractStreamOperator<String> + implements TwoInputStreamOperator<String, Integer, String>, InputSelectable { + + private final String name; + + private InputSelection inputSelection; + + public SequentialReadingStreamOperator(String name) { + super(); + + this.name = name; + this.inputSelection = InputSelection.FIRST; + } + + @Override + public InputSelection nextSelection() { + return inputSelection; + } + + @Override + public void processElement1(StreamRecord<String> element) { + output.collect(element.replace("[" + name + "-1]: " + element.getValue())); + } + + @Override + public void processElement2(StreamRecord<Integer> element) { + output.collect(element.replace("[" + name + "-2]: " + element.getValue())); + + this.inputSelection = InputSelection.SECOND; + } + } + + private static class SpecialRuleReadingStreamOperator extends AbstractStreamOperator<String> + implements TwoInputStreamOperator<String, Integer, String>, InputSelectable { + + private final String name; + + private final int input1Records; + private final int input2Records; + + private final int maxContinuousReadingRecords; + + private int input1ReadingRecords; + private int input2ReadingRecords; + + private int continuousReadingRecords; + private InputSelection inputSelection; + + SpecialRuleReadingStreamOperator(String name, int input1Records, int input2Records, int maxContinuousReadingRecords) { + super(); + + this.name = name; + this.input1Records = input1Records; + this.input2Records = input2Records; + this.maxContinuousReadingRecords = maxContinuousReadingRecords; + + this.input1ReadingRecords = 0; + this.input2ReadingRecords = 0; + this.continuousReadingRecords = 0; + this.inputSelection = InputSelection.FIRST; + } + + @Override + public InputSelection nextSelection() { + return inputSelection; + } + + @Override + public void processElement1(StreamRecord<String> element) { + output.collect(element.replace("[" + name + "-1]: " + element.getValue())); + + input1ReadingRecords++; + continuousReadingRecords++; + if (continuousReadingRecords == maxContinuousReadingRecords) { + continuousReadingRecords = 0; + if (input2ReadingRecords < input2Records) { + inputSelection = InputSelection.SECOND; + return; + } + } + + inputSelection = InputSelection.FIRST; + } + + @Override + public void processElement2(StreamRecord<Integer> element) { + output.collect(element.replace("[" + name + "-2]: " + element.getValue())); + + input2ReadingRecords++; + continuousReadingRecords++; + if (continuousReadingRecords == maxContinuousReadingRecords) { + continuousReadingRecords = 0; + if (input1ReadingRecords < input1Records) { + inputSelection = InputSelection.FIRST; + return; + } + } + + inputSelection = InputSelection.SECOND; + } + } + + private static class TestReadFinishedInputStreamOperator extends AbstractStreamOperator<String> + implements TwoInputStreamOperator<String, Integer, String>, InputSelectable { + + private InputSelection inputSelection; + + TestReadFinishedInputStreamOperator() { + super(); + + this.inputSelection = InputSelection.FIRST; + } + + @Override + public InputSelection nextSelection() { + return inputSelection; + } + + @Override + public void processElement1(StreamRecord<String> element) { + + } + + @Override + public void processElement2(StreamRecord<Integer> element) { + + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index f80983f..b30a789 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -40,6 +40,8 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; @@ -49,6 +51,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.HashSet; @@ -58,16 +62,26 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; /** - * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests - * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}. + * Tests for {@link TwoInputStreamTask} and {@link TwoInputSelectableStreamTask}. Theses tests + * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor} + * and {@link org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor}. * * <p>Note:<br> * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all * TwoInputStreamOperators. */ +@RunWith(Parameterized.class) public class TwoInputStreamTaskTest { + @Parameterized.Parameter + public boolean isInputSelectable; + + @Parameterized.Parameters(name = "isInputSelectable = {0}") + public static List<Boolean> parameters() { + return Arrays.asList(Boolean.FALSE, Boolean.TRUE); + } + /** * This test verifies that open() and close() are correctly called. This test also verifies * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input @@ -78,12 +92,13 @@ public class TwoInputStreamTaskTest { public void testOpenCloseAndTimestamps() throws Exception { final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>( - TwoInputStreamTask::new, + isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction()); + CoStreamMap<String, Integer, String> coMapOperator = isInputSelectable ? + new AnyReadingCoStreamMap<>(new TestOpenCloseMapFunction()) : new CoStreamMap<>(new TestOpenCloseMapFunction()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); @@ -125,7 +140,7 @@ public class TwoInputStreamTaskTest { final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>( - TwoInputStreamTask::new, + isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, @@ -133,7 +148,8 @@ public class TwoInputStreamTaskTest { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); + CoStreamMap<String, Integer, String> coMapOperator = isInputSelectable ? + new AnyReadingCoStreamMap<>(new IdentityMap()) : new CoStreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); @@ -235,6 +251,11 @@ public class TwoInputStreamTaskTest { @Test @SuppressWarnings("unchecked") public void testCheckpointBarriers() throws Exception { + if (isInputSelectable) { + // In the case of selective reading, checkpoints are not currently supported, and we skip this test + return; + } + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>( TwoInputStreamTask::new, @@ -319,6 +340,11 @@ public class TwoInputStreamTaskTest { @Test @SuppressWarnings("unchecked") public void testOvertakingCheckpointBarriers() throws Exception { + if (isInputSelectable) { + // In the case of selective reading, checkpoints are not currently supported, and we skip this test + return; + } + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>( TwoInputStreamTask::new, @@ -395,7 +421,9 @@ public class TwoInputStreamTaskTest { @Test public void testOperatorMetricReuse() throws Exception { - final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>( + isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator()) .chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) @@ -442,7 +470,8 @@ public class TwoInputStreamTaskTest { testHarness.waitForTaskCompletion(); } - static class DuplicatingOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> { + static class DuplicatingOperator extends AbstractStreamOperator<String> + implements TwoInputStreamOperator<String, String, String>, InputSelectable { @Override public void processElement1(StreamRecord<String> element) { @@ -455,13 +484,21 @@ public class TwoInputStreamTaskTest { output.collect(element); output.collect(element); } + + @Override + public InputSelection nextSelection() { + return InputSelection.ALL; + } } @Test public void testWatermarkMetrics() throws Exception { - final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>( + isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap()); + CoStreamMap<String, Integer, String> headOperator = isInputSelectable ? + new AnyReadingCoStreamMap<>(new IdentityMap()) : new CoStreamMap<>(new IdentityMap()); final OperatorID headOperatorId = new OperatorID(); OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator(); @@ -621,5 +658,18 @@ public class TwoInputStreamTaskTest { return value.toString(); } } + + private static class AnyReadingCoStreamMap<IN1, IN2, OUT> extends CoStreamMap<IN1, IN2, OUT> + implements InputSelectable { + + public AnyReadingCoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) { + super(mapper); + } + + @Override + public InputSelection nextSelection() { + return InputSelection.ALL; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index 4c1c424..aa900ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -36,7 +36,7 @@ import java.util.function.Function; /** - * Test harness for testing a {@link TwoInputStreamTask}. + * Test harness for testing a {@link TwoInputStreamTask} or a {@link TwoInputSelectableStreamTask}. * * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements @@ -71,7 +71,7 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest * it should be assigned to the first (1), or second (2) input of the task. */ public TwoInputStreamTaskTestHarness( - Function<Environment, ? extends TwoInputStreamTask<IN1, IN2, OUT>> taskFactory, + Function<Environment, ? extends AbstractTwoInputStreamTask<IN1, IN2, OUT>> taskFactory, int numInputGates, int numInputChannelsPerGate, int[] inputGateAssignment, @@ -98,7 +98,7 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest * second task input. */ public TwoInputStreamTaskTestHarness( - Function<Environment, ? extends TwoInputStreamTask<IN1, IN2, OUT>> taskFactory, + Function<Environment, ? extends AbstractTwoInputStreamTask<IN1, IN2, OUT>> taskFactory, TypeInformation<IN1> inputType1, TypeInformation<IN2> inputType2, TypeInformation<OUT> outputType) { @@ -169,8 +169,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest @Override @SuppressWarnings("unchecked") - public TwoInputStreamTask<IN1, IN2, OUT> getTask() { - return (TwoInputStreamTask<IN1, IN2, OUT>) super.getTask(); + public AbstractTwoInputStreamTask<IN1, IN2, OUT> getTask() { + return (AbstractTwoInputStreamTask<IN1, IN2, OUT>) super.getTask(); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java index 92af598..97495e5 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java @@ -91,4 +91,8 @@ public abstract class GeneratedClass<T> implements Serializable { public Object[] getReferences() { return references; } + + public Class<?> getClass(ClassLoader classLoader) { + return compile(classLoader); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java index 7f1d6f8..61d6fb8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -56,6 +57,11 @@ public class CodeGenOperatorFactory<OUT> implements StreamOperatorFactory<OUT> { return strategy; } + @Override + public boolean isOperatorSelectiveReading() { + return InputSelectable.class.isAssignableFrom(generatedClass.getClass(Thread.currentThread().getContextClassLoader())); + } + public GeneratedClass<? extends StreamOperator<OUT>> getGeneratedClass() { return generatedClass; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java new file mode 100644 index 0000000..693de32 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.tasks.StreamTaskSelectiveReadingTest.SequentialReadingStreamOperator; +import org.apache.flink.test.streaming.runtime.util.TestListResultSink; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for selective reading of {@code TwoInputSelectableStreamTask}. + */ +public class StreamTaskSelectiveReadingITCase { + @Test + public void testSequentialReading() throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<String> source0 = env.addSource( + new TestStringSource("Source0", + new String[] { + "Hello-1", "Hello-2", "Hello-3", "Hello-4", "Hello-5", "Hello-6" + })); + DataStream<Integer> source1 = env.addSource( + new TestIntegerSource("Source1", + new Integer[] { + 1, 2, 3 + })) + .setParallelism(2); + TestListResultSink<String> resultSink = new TestListResultSink<>(); + + TwoInputStreamOperator<String, Integer, String> twoInputStreamOperator = new SequentialReadingStreamOperator("Operator0"); + twoInputStreamOperator.setChainingStrategy(ChainingStrategy.NEVER); + + source0.connect(source1) + .transform( + "Custom Operator", + BasicTypeInfo.STRING_TYPE_INFO, + twoInputStreamOperator + ) + .addSink(resultSink); + + env.execute("Selective reading test"); + + List<String> result = resultSink.getResult(); + + List<String> expected1 = Arrays.asList( + "[Operator0-1]: [Source0-0]: Hello-1", + "[Operator0-1]: [Source0-0]: Hello-2", + "[Operator0-1]: [Source0-0]: Hello-3", + "[Operator0-1]: [Source0-0]: Hello-4", + "[Operator0-1]: [Source0-0]: Hello-5", + "[Operator0-1]: [Source0-0]: Hello-6" + ); + + List<String> expected2 = Arrays.asList( + "[Operator0-2]: 1", + "[Operator0-2]: 2", + "[Operator0-2]: 3", + "[Operator0-2]: 2", + "[Operator0-2]: 4", + "[Operator0-2]: 6" + ); + Collections.sort(expected2); + + assertEquals(expected1.size() + expected2.size(), result.size()); + assertEquals(expected1, result.subList(0, expected1.size())); + + List<String> result2 = result.subList(expected1.size(), expected1.size() + expected2.size()); + Collections.sort(result2); + assertEquals(expected2, result2); + } + + private abstract static class TestSource<T> extends RichParallelSourceFunction<T> { + private static final long serialVersionUID = 1L; + + protected final String name; + + private volatile boolean running = true; + private transient RuntimeContext context; + + private final T[] elements; + + public TestSource(String name, T[] elements) { + this.name = name; + this.elements = elements; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.context = getRuntimeContext(); + } + + @Override + public void run(SourceContext<T> ctx) throws Exception { + int elementIndex = 0; + while (running) { + if (elementIndex < elements.length) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(outValue(elements[elementIndex], context.getIndexOfThisSubtask())); + elementIndex++; + } + } else { + break; + } + } + } + + @Override + public void cancel() { + running = false; + } + + protected abstract T outValue(T inValue, int subTaskIndex); + } + + private static class TestStringSource extends TestSource<String> { + + public TestStringSource(String name, String[] elements) { + super(name, elements); + } + + @Override + protected String outValue(String inValue, int subTaskIndex) { + return "[" + name + "-" + subTaskIndex + "]: " + inValue; + } + } + + private static class TestIntegerSource extends TestSource<Integer> { + + public TestIntegerSource(String name, Integer[] elements) { + super(name, elements); + } + + @Override + protected Integer outValue(Integer inValue, int subTaskIndex) { + return inValue * (subTaskIndex + 1); + } + } +}
