This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c6786ab [FLINK-20654][checkpointing] Decline checkpoints until input
channels are recovered
c6786ab is described below
commit c6786ab9cf7e40be41a5a9c12461d5e60a789195
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jan 7 16:27:21 2021 +0100
[FLINK-20654][checkpointing] Decline checkpoints until input channels are
recovered
In regard to InputChannels, there are 3 cases when a checkpoint has to be
declined:
1. Channel state is not fully consumed
2. Channel was not yet converted from Recovered to normal
3. Channel was not yet converted from Unknown to normal
In the 1st case, new checkpoint may skip some recovered buffers.
In the 2nd and 3rd cases, not handling checkpointStarted() call
by normal channels can prevent incoming buffers from being captured.
In all these cases new checkpoint would be inconsistent.
---
.../io/network/partition/consumer/IndexedInputGate.java | 5 -----
.../io/network/partition/consumer/RecoveredInputChannel.java | 8 ++++++++
.../io/network/partition/consumer/UnknownInputChannel.java | 8 ++++++++
.../network/partition/consumer/RecoveredInputChannelTest.java | 10 ++++++++++
.../io/network/partition/consumer/SingleInputGateTest.java | 10 ++++++++++
.../flink/test/checkpointing/UnalignedCheckpointITCase.java | 5 ++---
6 files changed, 38 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 686c015..be837f4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -23,8 +23,6 @@ import
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
-import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
-
/** An {@link InputGate} with a specific index. */
public abstract class IndexedInputGate extends InputGate implements
CheckpointableInput {
/** Returns the index of this input gate. Only supported on */
@@ -32,9 +30,6 @@ public abstract class IndexedInputGate extends InputGate
implements Checkpointab
@Override
public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
- if (!getStateConsumedFuture().isDone()) {
- throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
- }
for (int index = 0, numChannels = getNumberOfInputChannels();
index < numChannels;
index++) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 81d64a7..eef4437 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -20,9 +20,11 @@ package
org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
@@ -40,6 +42,7 @@ import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -239,4 +242,9 @@ public abstract class RecoveredInputChannel extends
InputChannel implements Chan
}
return bufferManager.requestBufferBlocking();
}
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
+ throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 62aacbb..55b5dec 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,11 +18,13 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Optional;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -164,4 +167,9 @@ class UnknownInputChannel extends InputChannel implements
ChannelStateHolder {
Preconditions.checkState(this.channelStateWriter == null);
this.channelStateWriter = channelStateWriter;
}
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) throws
CheckpointException {
+ throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index 843f0bb..f958f2d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -19,12 +19,17 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.junit.Test;
import java.io.IOException;
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
+import static
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+
/** Tests for {@link RecoveredInputChannel}. */
public class RecoveredInputChannelTest {
@@ -38,6 +43,11 @@ public class RecoveredInputChannelTest {
buildChannel().requestSubpartition(0);
}
+ @Test(expected = CheckpointException.class)
+ public void testCheckpointStartImpossible() throws CheckpointException {
+ buildChannel().checkpointStarted(new CheckpointBarrier(0L, 0L,
unaligned(getDefault())));
+ }
+
private RecoveredInputChannel buildChannel() {
try {
return new RecoveredInputChannel(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 68ccd00..55c4e5d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -95,6 +95,16 @@ import static org.junit.Assert.fail;
public class SingleInputGateTest extends InputGateTestBase {
@Test(expected = CheckpointException.class)
+ public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws
CheckpointException {
+ SingleInputGate gate =
+ createInputGate(createNettyShuffleEnvironment(), 1,
ResultPartitionType.PIPELINED);
+ gate.setInputChannels(
+ new
InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate));
+ gate.checkpointStarted(
+ new CheckpointBarrier(1L, 1L, alignedNoTimeout(CHECKPOINT,
getDefault())));
+ }
+
+ @Test(expected = CheckpointException.class)
public void testCheckpointsDeclinedUnlessStateConsumed() throws
CheckpointException {
SingleInputGate gate =
createInputGate(createNettyShuffleEnvironment());
checkState(!gate.getStateConsumedFuture().isDone());
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 24027a6..b7be1e1 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -124,9 +124,8 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
},
new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
new Object[] {"Parallel cogroup, p = 10",
createCogroupSettings(10)},
- // todo: enable after completely fixing FLINK-20654
- // new Object[] {"Parallel union, p = 5",
createUnionSettings(5)},
- // new Object[] {"Parallel union, p = 10",
createUnionSettings(10)},
+ new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
+ new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
};
}