This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d25609d  [FLINK-18139][checkpointing] Fixing unaligned checkpoints 
checks wrong channels for inflight data.
d25609d is described below

commit d25609d07f52ba9bac3f7bb33ce0635e255e3d9d
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Jun 4 22:51:18 2020 +0200

    [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong 
channels for inflight data.
    
    CheckpointBarrierUnaligner#hasInflightData was not called with input gate 
contextual information, such that only the same first few channels are checked 
during initial snapshotting of inflight data for multi-gate setups.
---
 .../partition/consumer/SingleInputGateBuilder.java |  4 +-
 .../io/AlternatingCheckpointBarrierHandler.java    |  5 +-
 .../runtime/io/CheckpointBarrierHandler.java       |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java     |  4 +-
 .../runtime/io/CheckpointedInputGate.java          |  5 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   |  4 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     | 73 +++++++++++++++++++++-
 7 files changed, 85 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index ad607f7..b279998 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -70,12 +70,12 @@ public class SingleInputGateBuilder {
                return this;
        }
 
-       SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
+       public SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
                this.consumedSubpartitionIndex = consumedSubpartitionIndex;
                return this;
        }
 
-       SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
+       public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
                this.gateIndex = gateIndex;
                return this;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 09f05b9..8ed6788 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
        }
 
        @Override
-       public boolean hasInflightData(long checkpointId, int channelIndex) {
+       public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
                // should only be called for unaligned checkpoint
-               return unalignedHandler.hasInflightData(checkpointId, 
channelIndex);
+               return unalignedHandler.hasInflightData(checkpointId, 
channelInfo);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 07f6f7a..952af24 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -88,7 +89,7 @@ public abstract class CheckpointBarrierHandler implements 
Closeable {
         * this method returns true iff the unaligner still expects the 
respective barrier to be <i>consumed</i> on the
         * that channel.
         */
-       public boolean hasInflightData(long checkpointId, int channelIndex) {
+       public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
                return false;
        }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index 1d4bf82..0a6e661 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -223,14 +223,14 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
        }
 
        @Override
-       public boolean hasInflightData(long checkpointId, int channelIndex) {
+       public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
                if (checkpointId < currentConsumedCheckpointId) {
                        return false;
                }
                if (checkpointId > currentConsumedCheckpointId) {
                        return true;
                }
-               return hasInflightBuffers[channelIndex];
+               return 
hasInflightBuffers[getFlattenedChannelIndex(channelInfo)];
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 578cf21..1e2ccf3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -140,8 +140,9 @@ public class CheckpointedInputGate implements 
PullingAsyncDataInput<BufferOrEven
                        long checkpointId,
                        int channelIndex,
                        ChannelStateWriter channelStateWriter) throws 
IOException {
-               if (barrierHandler.hasInflightData(checkpointId, channelIndex)) 
{
-                       
inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId, 
channelStateWriter);
+               InputChannel channel = inputGate.getChannel(channelIndex);
+               if (barrierHandler.hasInflightData(checkpointId, 
channel.getChannelInfo())) {
+                       channel.spillInflightBuffers(checkpointId, 
channelStateWriter);
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
index fdaacb5..d723d06 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -107,7 +108,8 @@ public class AlternatingCheckpointBarrierHandlerTest {
 
        private static void assertInflightDataEquals(CheckpointBarrierHandler 
expected, CheckpointBarrierHandler actual, long barrierId, int numChannels) {
                for (int channelId = 0; channelId < numChannels; channelId++) {
-                       assertEquals(expected.hasInflightData(barrierId, 
channelId), actual.hasInflightData(barrierId, channelId));
+                       InputChannelInfo channelInfo = new InputChannelInfo(0, 
channelId);
+                       assertEquals(expected.hasInflightData(barrierId, 
channelInfo), actual.hasInflightData(barrierId, channelInfo));
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 0237aef..0793bf1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -22,17 +22,24 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
@@ -55,6 +62,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.Matchers.is;
@@ -111,6 +119,67 @@ public class StreamTaskNetworkInputTest {
        }
 
        @Test
+       public void testSnapshotWithTwoInputGates() throws Exception {
+               CheckpointBarrierUnaligner unaligner = new 
CheckpointBarrierUnaligner(
+                               new int[]{ 1, 1 },
+                               ChannelStateWriter.NO_OP,
+                               "test",
+                               new DummyCheckpointInvokable());
+
+               SingleInputGate inputGate1 = new 
SingleInputGateBuilder().setSingleInputGateIndex(0).build();
+               RemoteInputChannel channel1 = 
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate1);
+               inputGate1.setInputChannels(channel1);
+               
inputGate1.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+               StreamTaskNetworkInput<Long> input1 = createInput(unaligner, 
inputGate1);
+
+               SingleInputGate inputGate2 = new 
SingleInputGateBuilder().setSingleInputGateIndex(1).build();
+               RemoteInputChannel channel2 = 
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate2);
+               inputGate2.setInputChannels(channel2);
+               
inputGate2.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+               StreamTaskNetworkInput<Long> input2 = createInput(unaligner, 
inputGate2);
+
+               CheckpointBarrier barrier = new CheckpointBarrier(0, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               channel1.onBuffer(EventSerializer.toBuffer(barrier), 0, 0);
+               channel1.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(1), 1, 
0);
+
+               // all records on inputGate2 are now in-flight
+               channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(2), 0, 
0);
+               channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(3), 1, 
0);
+
+               // now snapshot all inflight buffers
+               RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
+               channelStateWriter.start(0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               CompletableFuture<Void> completableFuture1 = 
input1.prepareSnapshot(channelStateWriter, 0);
+               CompletableFuture<Void> completableFuture2 = 
input2.prepareSnapshot(channelStateWriter, 0);
+
+               // finish unaligned checkpoint on input side
+               channel2.onBuffer(EventSerializer.toBuffer(barrier), 2, 0);
+
+               // futures should be completed
+               completableFuture1.join();
+               completableFuture2.join();
+
+               
assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()), 
Collections.emptyList());
+               List<Buffer> storedBuffers = 
channelStateWriter.getAddedInput().get(channel2.getChannelInfo());
+               assertEquals(Arrays.asList(2, 3), 
storedBuffers.stream().map(Buffer::getSize).collect(Collectors.toList()));
+       }
+
+       private StreamTaskNetworkInput<Long> 
createInput(CheckpointBarrierHandler handler, SingleInputGate inputGate) {
+               return new StreamTaskNetworkInput<>(
+                       new CheckpointedInputGate(inputGate, handler),
+                       LongSerializer.INSTANCE,
+                       new 
StatusWatermarkValve(inputGate.getNumberOfInputChannels(), new 
NoOpDataOutput<>()),
+                       inputGate.getGateIndex(),
+                       
createDeserializers(inputGate.getNumberOfInputChannels()));
+       }
+
+       private TestRecordDeserializer[] createDeserializers(int 
numberOfInputChannels) {
+               return IntStream.range(0, numberOfInputChannels)
+                               .mapToObj(index -> new 
TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
+                               .toArray(TestRecordDeserializer[]::new);
+       }
+
+       @Test
        public void testSnapshotAfterEndOfPartition() throws Exception {
                int numInputChannels = 1;
                int channelId = 0;
@@ -119,9 +188,7 @@ public class StreamTaskNetworkInputTest {
                VerifyRecordsDataOutput<Long> output = new 
VerifyRecordsDataOutput<>();
                LongSerializer inSerializer = LongSerializer.INSTANCE;
                StreamTestSingleInputGate<Long> inputGate = new 
StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
-               TestRecordDeserializer[] deserializers = IntStream.range(0, 
numInputChannels)
-                       .mapToObj(index -> new 
TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
-                       .toArray(TestRecordDeserializer[]::new);
+               TestRecordDeserializer[] deserializers = 
createDeserializers(numInputChannels);
                StreamTaskNetworkInput<Long> input = new 
StreamTaskNetworkInput<>(
                        new CheckpointedInputGate(
                                inputGate.getInputGate(),

Reply via email to