This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6c7a69a0442d6bfb3b5d86006db462756a51ec7c Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Jun 18 17:26:47 2019 +0200 [FLINK-12777][operator] Use CheckpointedInputGate StreamTwoInputSelectableProcessor --- .../runtime/io/CheckpointBarrierDiscarder.java | 74 ---------------------- .../runtime/io/CheckpointedInputGate.java | 23 ++++++- .../streaming/runtime/io/InputProcessorUtil.java | 41 ++++++++++++ .../io/StreamTwoInputSelectableProcessor.java | 23 +++++-- .../tasks/TwoInputSelectableStreamTask.java | 7 +- 5 files changed, 86 insertions(+), 82 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java deleted file mode 100644 index 4c6cdab..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; - -import java.io.IOException; - -/** - * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels. - */ -@Internal -public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler { - public CheckpointBarrierDiscarder() { - super(null); - } - - @Override - public void releaseBlocksAndResetBarriers() throws IOException { - } - - @Override - public boolean isBlocked(int channelIndex) { - return false; - } - - @Override - public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception { - return false; - } - - @Override - public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { - return false; - } - - @Override - public boolean processEndOfPartition() throws Exception { - return false; - } - - @Override - public long getLatestCheckpointId() { - return 0; - } - - @Override - public long getAlignmentDurationNanos() { - return 0; - } - - @Override - public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception { - - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index 7604d0a..ce80e30 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -52,6 +52,8 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { /** The gate that the buffer draws its input from. */ private final InputGate inputGate; + private final int channelIndexOffset; + private final BufferStorage bufferStorage; /** Flag to indicate whether we have drawn all available input. */ @@ -89,6 +91,13 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { ); } + public CheckpointedInputGate( + InputGate inputGate, + BufferStorage bufferStorage, + CheckpointBarrierHandler barrierHandler) { + this(inputGate, bufferStorage, barrierHandler, 0); + } + /** * Creates a new checkpoint stream aligner. * @@ -99,12 +108,16 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { * @param inputGate The input gate to draw the buffers and events from. * @param bufferStorage The storage to hold the buffers and events for blocked channels. * @param barrierHandler Handler that controls which channels are blocked. + * @param channelIndexOffset Optional offset added to channelIndex returned from the inputGate + * before passing it to the barrierHandler. */ public CheckpointedInputGate( InputGate inputGate, BufferStorage bufferStorage, - CheckpointBarrierHandler barrierHandler) { + CheckpointBarrierHandler barrierHandler, + int channelIndexOffset) { this.inputGate = inputGate; + this.channelIndexOffset = channelIndexOffset; this.bufferStorage = checkNotNull(bufferStorage); this.barrierHandler = barrierHandler; } @@ -138,7 +151,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { } BufferOrEvent bufferOrEvent = next.get(); - if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) { + if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) { // if the channel is blocked, we just store the BufferOrEvent bufferStorage.add(bufferOrEvent); if (bufferStorage.isFull()) { @@ -153,7 +166,7 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent(); if (!endOfInputGate) { // process barriers only if there is a chance of the checkpoint completing - if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) { + if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) { bufferStorage.rollOver(); } } @@ -174,6 +187,10 @@ public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> { } } + private int offsetChannelIndex(int channelIndex) { + return channelIndex + channelIndexOffset; + } + private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception { if (!inputGate.isFinished()) { return Optional.empty(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 419cf16..800c33e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.CheckpointingMode; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkState; + /** * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}. @@ -51,6 +53,45 @@ public class InputProcessorUtil { return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler); } + /** + * @return a pair of {@link CheckpointedInputGate} created for two corresponding + * {@link InputGate}s supplied as parameters. + */ + public static CheckpointedInputGate[] createCheckpointedInputGatePair( + AbstractInvokable toNotifyOnCheckpoint, + CheckpointingMode checkpointMode, + IOManager ioManager, + InputGate inputGate1, + InputGate inputGate2, + Configuration taskManagerConfig, + String taskName) throws IOException { + + BufferStorage mainBufferStorage1 = createBufferStorage( + checkpointMode, ioManager, inputGate1.getPageSize(), taskManagerConfig, taskName); + BufferStorage mainBufferStorage2 = createBufferStorage( + checkpointMode, ioManager, inputGate2.getPageSize(), taskManagerConfig, taskName); + checkState(mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes()); + + BufferStorage linkedBufferStorage1 = new LinkedBufferStorage( + mainBufferStorage1, + mainBufferStorage2, + mainBufferStorage1.getMaxBufferedBytes()); + BufferStorage linkedBufferStorage2 = new LinkedBufferStorage( + mainBufferStorage2, + mainBufferStorage1, + mainBufferStorage1.getMaxBufferedBytes()); + + CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( + checkpointMode, + inputGate1.getNumberOfInputChannels() + inputGate2.getNumberOfInputChannels(), + taskName, + toNotifyOnCheckpoint); + return new CheckpointedInputGate[] { + new CheckpointedInputGate(inputGate1, linkedBufferStorage1, barrierHandler), + new CheckpointedInputGate(inputGate2, linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels()) + }; + } + private static CheckpointBarrierHandler createCheckpointBarrierHandler( CheckpointingMode checkpointMode, int numberOfInputChannels, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java index 37c17db..d5172ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java @@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -36,6 +38,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -101,13 +104,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> { Collection<InputGate> inputGates2, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, + StreamTask<?, ?> streamTask, + CheckpointingMode checkpointingMode, Object lock, IOManager ioManager, + Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, - OperatorChain<?, ?> operatorChain) { + String taskName, + OperatorChain<?, ?> operatorChain) throws IOException { checkState(streamOperator instanceof InputSelectable); @@ -120,9 +127,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> { InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0])); // create a Input instance for each input - CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize()); - this.input1 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0); - this.input2 = new StreamTaskNetworkInput(new CheckpointedInputGate(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1); + CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair( + streamTask, + checkpointingMode, + ioManager, + unionedInputGate1, + unionedInputGate2, + taskManagerConfig, + taskName); + checkState(checkpointedInputGates.length == 2); + this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0); + this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1); this.statusWatermarkValve1 = new StatusWatermarkValve( unionedInputGate1.getNumberOfInputChannels(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java index b577b20..cde5a5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor; +import java.io.IOException; import java.util.Collection; /** @@ -44,17 +45,21 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu Collection<InputGate> inputGates1, Collection<InputGate> inputGates2, TypeSerializer<IN1> inputDeserializer1, - TypeSerializer<IN2> inputDeserializer2) { + TypeSerializer<IN2> inputDeserializer2) throws IOException { this.inputProcessor = new StreamTwoInputSelectableProcessor<>( inputGates1, inputGates2, inputDeserializer1, inputDeserializer2, + this, + getConfiguration().getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), + getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), this.headOperator, input1WatermarkGauge, input2WatermarkGauge, + getTaskNameWithSubtaskAndId(), operatorChain); }
