This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 45f8577 [FLINK-20654][checkpointing] Decline checkpoints until input
channels are recovered
45f8577 is described below
commit 45f8577bd853ea8a7d1d35490c67b8b158b79499
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jan 7 16:27:21 2021 +0100
[FLINK-20654][checkpointing] Decline checkpoints until input channels are
recovered
In regard to InputChannels, there are 3 cases when a checkpoint has to be
declined:
1. Channel state is not fully consumed
2. Channel was not yet converted from Recovered to normal
3. Channel was not yet converted from Unknown to normal
In the 1st case, new checkpoint may skip some recovered buffers.
In the 2nd and 3rd cases, not handling checkpointStarted() call
by normal channels can prevent incoming buffers from being captured.
In all these cases new checkpoint would be inconsistent.
---
.../io/network/partition/consumer/IndexedInputGate.java | 5 -----
.../partition/consumer/RecoveredInputChannel.java | 8 ++++++++
.../network/partition/consumer/UnknownInputChannel.java | 8 ++++++++
.../partition/consumer/RecoveredInputChannelTest.java | 16 ++++++++++++++++
.../network/partition/consumer/SingleInputGateTest.java | 10 ++++++++++
.../test/checkpointing/UnalignedCheckpointITCase.java | 5 ++---
6 files changed, 44 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 18f1fb6..f87b6d5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -21,8 +21,6 @@ import
org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
-
/** An {@link InputGate} with a specific index. */
public abstract class IndexedInputGate extends InputGate implements
CheckpointableInput {
/** Returns the index of this input gate. Only supported on */
@@ -30,9 +28,6 @@ public abstract class IndexedInputGate extends InputGate
implements Checkpointab
@Override
public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
- if (!getStateConsumedFuture().isDone()) {
- throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
- }
for (int index = 0, numChannels = getNumberOfInputChannels();
index < numChannels;
index++) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 81d64a7..eef4437 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -20,9 +20,11 @@ package
org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
@@ -40,6 +42,7 @@ import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -239,4 +242,9 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
}
return bufferManager.requestBufferBlocking();
}
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
+ throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 62aacbb..55b5dec 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,11 +18,13 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Optional;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -164,4 +167,9 @@ class UnknownInputChannel extends InputChannel implements
ChannelStateHolder {
Preconditions.checkState(this.channelStateWriter == null);
this.channelStateWriter = channelStateWriter;
}
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
+ throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index 843f0bb..174f0ad 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -19,12 +19,18 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.junit.Test;
import java.io.IOException;
+import static
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+
/** Tests for {@link RecoveredInputChannel}. */
public class RecoveredInputChannelTest {
@@ -38,6 +44,16 @@ public class RecoveredInputChannelTest {
buildChannel().requestSubpartition(0);
}
+ @Test(expected = CheckpointException.class)
+ public void testCheckpointStartImpossible() throws CheckpointException {
+ buildChannel()
+ .checkpointStarted(
+ new CheckpointBarrier(
+ 0L,
+ 0L,
+ new
CheckpointOptions(CheckpointType.CHECKPOINT, getDefault())));
+ }
+
private RecoveredInputChannel buildChannel() {
try {
return new RecoveredInputChannel(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 39a119f..924211c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -95,6 +95,16 @@ import static org.junit.Assert.fail;
public class SingleInputGateTest extends InputGateTestBase {
@Test(expected = CheckpointException.class)
+ public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws
CheckpointException {
+ SingleInputGate gate =
+ createInputGate(createNettyShuffleEnvironment(), 1,
ResultPartitionType.PIPELINED);
+ gate.setInputChannels(
+ new
InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate));
+ gate.checkpointStarted(
+ new CheckpointBarrier(1L, 1L, new
CheckpointOptions(CHECKPOINT, getDefault())));
+ }
+
+ @Test(expected = CheckpointException.class)
public void testCheckpointsDeclinedUnlessStateConsumed() throws
CheckpointException {
SingleInputGate gate =
createInputGate(createNettyShuffleEnvironment());
checkState(!gate.getStateConsumedFuture().isDone());
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index dc9e51a..4530bd5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -120,9 +120,8 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
},
new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
new Object[] {"Parallel cogroup, p = 10",
createCogroupSettings(10)},
- // todo: enable after completely fixing FLINK-20654
- // new Object[] {"Parallel union, p = 5",
createUnionSettings(5)},
- // new Object[] {"Parallel union, p = 10",
createUnionSettings(10)},
+ new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
+ new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
};
}