This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c7a69a0442d6bfb3b5d86006db462756a51ec7c
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Jun 18 17:26:47 2019 +0200

    [FLINK-12777][operator] Use CheckpointedInputGate 
StreamTwoInputSelectableProcessor
---
 .../runtime/io/CheckpointBarrierDiscarder.java     | 74 ----------------------
 .../runtime/io/CheckpointedInputGate.java          | 23 ++++++-
 .../streaming/runtime/io/InputProcessorUtil.java   | 41 ++++++++++++
 .../io/StreamTwoInputSelectableProcessor.java      | 23 +++++--
 .../tasks/TwoInputSelectableStreamTask.java        |  7 +-
 5 files changed, 86 insertions(+), 82 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
deleted file mode 100644
index 4c6cdab..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-
-/**
- * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have 
been received from which input channels.
- */
-@Internal
-public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
-       public CheckpointBarrierDiscarder() {
-               super(null);
-       }
-
-       @Override
-       public void releaseBlocksAndResetBarriers() throws IOException {
-       }
-
-       @Override
-       public boolean isBlocked(int channelIndex) {
-               return false;
-       }
-
-       @Override
-       public boolean processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex, long bufferedBytes) throws Exception {
-               return false;
-       }
-
-       @Override
-       public boolean processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
-               return false;
-       }
-
-       @Override
-       public boolean processEndOfPartition() throws Exception {
-               return false;
-       }
-
-       @Override
-       public long getLatestCheckpointId() {
-               return 0;
-       }
-
-       @Override
-       public long getAlignmentDurationNanos() {
-               return 0;
-       }
-
-       @Override
-       public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws 
Exception {
-
-       }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 7604d0a..ce80e30 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -52,6 +52,8 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
        /** The gate that the buffer draws its input from. */
        private final InputGate inputGate;
 
+       private final int channelIndexOffset;
+
        private final BufferStorage bufferStorage;
 
        /** Flag to indicate whether we have drawn all available input. */
@@ -89,6 +91,13 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
                );
        }
 
+       public CheckpointedInputGate(
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       CheckpointBarrierHandler barrierHandler) {
+               this(inputGate, bufferStorage, barrierHandler, 0);
+       }
+
        /**
         * Creates a new checkpoint stream aligner.
         *
@@ -99,12 +108,16 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
         * @param inputGate The input gate to draw the buffers and events from.
         * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
         * @param barrierHandler Handler that controls which channels are 
blocked.
+        * @param channelIndexOffset Optional offset added to channelIndex 
returned from the inputGate
+        *                           before passing it to the barrierHandler.
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
                        BufferStorage bufferStorage,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       int channelIndexOffset) {
                this.inputGate = inputGate;
+               this.channelIndexOffset = channelIndexOffset;
                this.bufferStorage = checkNotNull(bufferStorage);
                this.barrierHandler = barrierHandler;
        }
@@ -138,7 +151,7 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
                        }
 
                        BufferOrEvent bufferOrEvent = next.get();
-                       if 
(barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
+                       if 
(barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) 
{
                                // if the channel is blocked, we just store the 
BufferOrEvent
                                bufferStorage.add(bufferOrEvent);
                                if (bufferStorage.isFull()) {
@@ -153,7 +166,7 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
                                CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
                                if (!endOfInputGate) {
                                        // process barriers only if there is a 
chance of the checkpoint completing
-                                       if 
(barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+                                       if 
(barrierHandler.processBarrier(checkpointBarrier, 
offsetChannelIndex(bufferOrEvent.getChannelIndex()), 
bufferStorage.getPendingBytes())) {
                                                bufferStorage.rollOver();
                                        }
                                }
@@ -174,6 +187,10 @@ public class CheckpointedInputGate implements 
AsyncDataInput<BufferOrEvent> {
                }
        }
 
+       private int offsetChannelIndex(int channelIndex) {
+               return channelIndex + channelIndexOffset;
+       }
+
        private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
                if (!inputGate.isFinished()) {
                        return Optional.empty();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 419cf16..800c33e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
  * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
@@ -51,6 +53,45 @@ public class InputProcessorUtil {
                return new CheckpointedInputGate(inputGate, bufferStorage, 
barrierHandler);
        }
 
+       /**
+        * @return a pair of {@link CheckpointedInputGate} created for two 
corresponding
+        * {@link InputGate}s supplied as parameters.
+        */
+       public static CheckpointedInputGate[] createCheckpointedInputGatePair(
+                       AbstractInvokable toNotifyOnCheckpoint,
+                       CheckpointingMode checkpointMode,
+                       IOManager ioManager,
+                       InputGate inputGate1,
+                       InputGate inputGate2,
+                       Configuration taskManagerConfig,
+                       String taskName) throws IOException {
+
+               BufferStorage mainBufferStorage1 = createBufferStorage(
+                       checkpointMode, ioManager, inputGate1.getPageSize(), 
taskManagerConfig, taskName);
+               BufferStorage mainBufferStorage2 = createBufferStorage(
+                       checkpointMode, ioManager, inputGate2.getPageSize(), 
taskManagerConfig, taskName);
+               checkState(mainBufferStorage1.getMaxBufferedBytes() == 
mainBufferStorage2.getMaxBufferedBytes());
+
+               BufferStorage linkedBufferStorage1 = new LinkedBufferStorage(
+                       mainBufferStorage1,
+                       mainBufferStorage2,
+                       mainBufferStorage1.getMaxBufferedBytes());
+               BufferStorage linkedBufferStorage2 = new LinkedBufferStorage(
+                       mainBufferStorage2,
+                       mainBufferStorage1,
+                       mainBufferStorage1.getMaxBufferedBytes());
+
+               CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
+                       checkpointMode,
+                       inputGate1.getNumberOfInputChannels() + 
inputGate2.getNumberOfInputChannels(),
+                       taskName,
+                       toNotifyOnCheckpoint);
+               return new CheckpointedInputGate[] {
+                       new CheckpointedInputGate(inputGate1, 
linkedBufferStorage1, barrierHandler),
+                       new CheckpointedInputGate(inputGate2, 
linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels())
+               };
+       }
+
        private static CheckpointBarrierHandler createCheckpointBarrierHandler(
                        CheckpointingMode checkpointMode,
                        int numberOfInputChannels,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 37c17db..d5172ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -36,6 +38,7 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -101,13 +104,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
                Collection<InputGate> inputGates2,
                TypeSerializer<IN1> inputSerializer1,
                TypeSerializer<IN2> inputSerializer2,
+               StreamTask<?, ?> streamTask,
+               CheckpointingMode checkpointingMode,
                Object lock,
                IOManager ioManager,
+               Configuration taskManagerConfig,
                StreamStatusMaintainer streamStatusMaintainer,
                TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
                WatermarkGauge input1WatermarkGauge,
                WatermarkGauge input2WatermarkGauge,
-               OperatorChain<?, ?> operatorChain) {
+               String taskName,
+               OperatorChain<?, ?> operatorChain) throws IOException {
 
                checkState(streamOperator instanceof InputSelectable);
 
@@ -120,9 +127,17 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
                InputGate unionedInputGate2 = 
InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
 
                // create a Input instance for each input
-               CachedBufferStorage bufferStorage = new 
CachedBufferStorage(unionedInputGate1.getPageSize());
-               this.input1 = new StreamTaskNetworkInput(new 
CheckpointedInputGate(unionedInputGate1, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
-               this.input2 = new StreamTaskNetworkInput(new 
CheckpointedInputGate(unionedInputGate2, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
+               CheckpointedInputGate[] checkpointedInputGates = 
InputProcessorUtil.createCheckpointedInputGatePair(
+                       streamTask,
+                       checkpointingMode,
+                       ioManager,
+                       unionedInputGate1,
+                       unionedInputGate2,
+                       taskManagerConfig,
+                       taskName);
+               checkState(checkpointedInputGates.length == 2);
+               this.input1 = new 
StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 
0);
+               this.input2 = new 
StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 
1);
 
                this.statusWatermarkValve1 = new StatusWatermarkValve(
                        unionedInputGate1.getNumberOfInputChannels(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index b577b20..cde5a5a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
 
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -44,17 +45,21 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> 
extends AbstractTwoInpu
                Collection<InputGate> inputGates1,
                Collection<InputGate> inputGates2,
                TypeSerializer<IN1> inputDeserializer1,
-               TypeSerializer<IN2> inputDeserializer2) {
+               TypeSerializer<IN2> inputDeserializer2) throws IOException {
 
                this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
                        inputGates1, inputGates2,
                        inputDeserializer1, inputDeserializer2,
+                       this,
+                       getConfiguration().getCheckpointMode(),
                        getCheckpointLock(),
                        getEnvironment().getIOManager(),
+                       
getEnvironment().getTaskManagerInfo().getConfiguration(),
                        getStreamStatusMaintainer(),
                        this.headOperator,
                        input1WatermarkGauge,
                        input2WatermarkGauge,
+                       getTaskNameWithSubtaskAndId(),
                        operatorChain);
        }
 

Reply via email to