This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d25609d [FLINK-18139][checkpointing] Fixing unaligned checkpoints
checks wrong channels for inflight data.
d25609d is described below
commit d25609d07f52ba9bac3f7bb33ce0635e255e3d9d
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Jun 4 22:51:18 2020 +0200
[FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong
channels for inflight data.
CheckpointBarrierUnaligner#hasInflightData was not called with input gate
contextual information, such that only the same first few channels are checked
during initial snapshotting of inflight data for multi-gate setups.
---
.../partition/consumer/SingleInputGateBuilder.java | 4 +-
.../io/AlternatingCheckpointBarrierHandler.java | 5 +-
.../runtime/io/CheckpointBarrierHandler.java | 3 +-
.../runtime/io/CheckpointBarrierUnaligner.java | 4 +-
.../runtime/io/CheckpointedInputGate.java | 5 +-
.../AlternatingCheckpointBarrierHandlerTest.java | 4 +-
.../runtime/io/StreamTaskNetworkInputTest.java | 73 +++++++++++++++++++++-
7 files changed, 85 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index ad607f7..b279998 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -70,12 +70,12 @@ public class SingleInputGateBuilder {
return this;
}
- SingleInputGateBuilder setConsumedSubpartitionIndex(int
consumedSubpartitionIndex) {
+ public SingleInputGateBuilder setConsumedSubpartitionIndex(int
consumedSubpartitionIndex) {
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
return this;
}
- SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
+ public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
this.gateIndex = gateIndex;
return this;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 09f05b9..8ed6788 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends
CheckpointBarrierHandler {
}
@Override
- public boolean hasInflightData(long checkpointId, int channelIndex) {
+ public boolean hasInflightData(long checkpointId, InputChannelInfo
channelInfo) {
// should only be called for unaligned checkpoint
- return unalignedHandler.hasInflightData(checkpointId,
channelIndex);
+ return unalignedHandler.hasInflightData(checkpointId,
channelInfo);
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 07f6f7a..952af24 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -22,6 +22,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -88,7 +89,7 @@ public abstract class CheckpointBarrierHandler implements
Closeable {
* this method returns true iff the unaligner still expects the
respective barrier to be <i>consumed</i> on the
* that channel.
*/
- public boolean hasInflightData(long checkpointId, int channelIndex) {
+ public boolean hasInflightData(long checkpointId, InputChannelInfo
channelInfo) {
return false;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index 1d4bf82..0a6e661 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -223,14 +223,14 @@ public class CheckpointBarrierUnaligner extends
CheckpointBarrierHandler {
}
@Override
- public boolean hasInflightData(long checkpointId, int channelIndex) {
+ public boolean hasInflightData(long checkpointId, InputChannelInfo
channelInfo) {
if (checkpointId < currentConsumedCheckpointId) {
return false;
}
if (checkpointId > currentConsumedCheckpointId) {
return true;
}
- return hasInflightBuffers[channelIndex];
+ return
hasInflightBuffers[getFlattenedChannelIndex(channelInfo)];
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 578cf21..1e2ccf3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -140,8 +140,9 @@ public class CheckpointedInputGate implements
PullingAsyncDataInput<BufferOrEven
long checkpointId,
int channelIndex,
ChannelStateWriter channelStateWriter) throws
IOException {
- if (barrierHandler.hasInflightData(checkpointId, channelIndex))
{
-
inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId,
channelStateWriter);
+ InputChannel channel = inputGate.getChannel(channelIndex);
+ if (barrierHandler.hasInflightData(checkpointId,
channel.getChannelInfo())) {
+ channel.spillInflightBuffers(checkpointId,
channelStateWriter);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
index fdaacb5..d723d06 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -107,7 +108,8 @@ public class AlternatingCheckpointBarrierHandlerTest {
private static void assertInflightDataEquals(CheckpointBarrierHandler
expected, CheckpointBarrierHandler actual, long barrierId, int numChannels) {
for (int channelId = 0; channelId < numChannels; channelId++) {
- assertEquals(expected.hasInflightData(barrierId,
channelId), actual.hasInflightData(barrierId, channelId));
+ InputChannelInfo channelInfo = new InputChannelInfo(0,
channelId);
+ assertEquals(expected.hasInflightData(barrierId,
channelInfo), actual.hasInflightData(barrierId, channelInfo));
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 0237aef..0793bf1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -22,17 +22,24 @@ import
org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
@@ -55,6 +62,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.is;
@@ -111,6 +119,67 @@ public class StreamTaskNetworkInputTest {
}
@Test
+ public void testSnapshotWithTwoInputGates() throws Exception {
+ CheckpointBarrierUnaligner unaligner = new
CheckpointBarrierUnaligner(
+ new int[]{ 1, 1 },
+ ChannelStateWriter.NO_OP,
+ "test",
+ new DummyCheckpointInvokable());
+
+ SingleInputGate inputGate1 = new
SingleInputGateBuilder().setSingleInputGateIndex(0).build();
+ RemoteInputChannel channel1 =
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate1);
+ inputGate1.setInputChannels(channel1);
+
inputGate1.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+ StreamTaskNetworkInput<Long> input1 = createInput(unaligner,
inputGate1);
+
+ SingleInputGate inputGate2 = new
SingleInputGateBuilder().setSingleInputGateIndex(1).build();
+ RemoteInputChannel channel2 =
InputChannelBuilder.newBuilder().buildRemoteChannel(inputGate2);
+ inputGate2.setInputChannels(channel2);
+
inputGate2.registerBufferReceivedListener(unaligner.getBufferReceivedListener().get());
+ StreamTaskNetworkInput<Long> input2 = createInput(unaligner,
inputGate2);
+
+ CheckpointBarrier barrier = new CheckpointBarrier(0, 0L,
CheckpointOptions.forCheckpointWithDefaultLocation());
+ channel1.onBuffer(EventSerializer.toBuffer(barrier), 0, 0);
+ channel1.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(1), 1,
0);
+
+ // all records on inputGate2 are now in-flight
+ channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(2), 0,
0);
+ channel2.onBuffer(BufferBuilderTestUtils.buildSomeBuffer(3), 1,
0);
+
+ // now snapshot all inflight buffers
+ RecordingChannelStateWriter channelStateWriter = new
RecordingChannelStateWriter();
+ channelStateWriter.start(0,
CheckpointOptions.forCheckpointWithDefaultLocation());
+ CompletableFuture<Void> completableFuture1 =
input1.prepareSnapshot(channelStateWriter, 0);
+ CompletableFuture<Void> completableFuture2 =
input2.prepareSnapshot(channelStateWriter, 0);
+
+ // finish unaligned checkpoint on input side
+ channel2.onBuffer(EventSerializer.toBuffer(barrier), 2, 0);
+
+ // futures should be completed
+ completableFuture1.join();
+ completableFuture2.join();
+
+
assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()),
Collections.emptyList());
+ List<Buffer> storedBuffers =
channelStateWriter.getAddedInput().get(channel2.getChannelInfo());
+ assertEquals(Arrays.asList(2, 3),
storedBuffers.stream().map(Buffer::getSize).collect(Collectors.toList()));
+ }
+
+ private StreamTaskNetworkInput<Long>
createInput(CheckpointBarrierHandler handler, SingleInputGate inputGate) {
+ return new StreamTaskNetworkInput<>(
+ new CheckpointedInputGate(inputGate, handler),
+ LongSerializer.INSTANCE,
+ new
StatusWatermarkValve(inputGate.getNumberOfInputChannels(), new
NoOpDataOutput<>()),
+ inputGate.getGateIndex(),
+
createDeserializers(inputGate.getNumberOfInputChannels()));
+ }
+
+ private TestRecordDeserializer[] createDeserializers(int
numberOfInputChannels) {
+ return IntStream.range(0, numberOfInputChannels)
+ .mapToObj(index -> new
TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
+ .toArray(TestRecordDeserializer[]::new);
+ }
+
+ @Test
public void testSnapshotAfterEndOfPartition() throws Exception {
int numInputChannels = 1;
int channelId = 0;
@@ -119,9 +188,7 @@ public class StreamTaskNetworkInputTest {
VerifyRecordsDataOutput<Long> output = new
VerifyRecordsDataOutput<>();
LongSerializer inSerializer = LongSerializer.INSTANCE;
StreamTestSingleInputGate<Long> inputGate = new
StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
- TestRecordDeserializer[] deserializers = IntStream.range(0,
numInputChannels)
- .mapToObj(index -> new
TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
- .toArray(TestRecordDeserializer[]::new);
+ TestRecordDeserializer[] deserializers =
createDeserializers(numInputChannels);
StreamTaskNetworkInput<Long> input = new
StreamTaskNetworkInput<>(
new CheckpointedInputGate(
inputGate.getInputGate(),