This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 45f8577  [FLINK-20654][checkpointing] Decline checkpoints until input 
channels are recovered
45f8577 is described below

commit 45f8577bd853ea8a7d1d35490c67b8b158b79499
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jan 7 16:27:21 2021 +0100

    [FLINK-20654][checkpointing] Decline checkpoints until input channels are 
recovered
    
    In regard to InputChannels, there are 3 cases when a checkpoint has to be 
declined:
    1. Channel state is not fully consumed
    2. Channel was not yet converted from Recovered to normal
    3. Channel was not yet converted from Unknown to normal
    
    In the 1st case, new checkpoint may skip some recovered buffers.
    In the 2nd and 3rd cases, not handling checkpointStarted() call
    by normal channels can prevent incoming buffers from being captured.
    In all these cases new checkpoint would be inconsistent.
---
 .../io/network/partition/consumer/IndexedInputGate.java  |  5 -----
 .../partition/consumer/RecoveredInputChannel.java        |  8 ++++++++
 .../network/partition/consumer/UnknownInputChannel.java  |  8 ++++++++
 .../partition/consumer/RecoveredInputChannelTest.java    | 16 ++++++++++++++++
 .../network/partition/consumer/SingleInputGateTest.java  | 10 ++++++++++
 .../test/checkpointing/UnalignedCheckpointITCase.java    |  5 ++---
 6 files changed, 44 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 18f1fb6..f87b6d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -21,8 +21,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
-import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
-
 /** An {@link InputGate} with a specific index. */
 public abstract class IndexedInputGate extends InputGate implements 
CheckpointableInput {
     /** Returns the index of this input gate. Only supported on */
@@ -30,9 +28,6 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
 
     @Override
     public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
-        if (!getStateConsumedFuture().isDone()) {
-            throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
-        }
         for (int index = 0, numChannels = getNumberOfInputChannels();
                 index < numChannels;
                 index++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 81d64a7..eef4437 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -20,9 +20,11 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
@@ -40,6 +42,7 @@ import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -239,4 +242,9 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
         }
         return bufferManager.requestBufferBlocking();
     }
+
+    @Override
+    public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
+        throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 62aacbb..55b5dec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -164,4 +167,9 @@ class UnknownInputChannel extends InputChannel implements 
ChannelStateHolder {
         Preconditions.checkState(this.channelStateWriter == null);
         this.channelStateWriter = channelStateWriter;
     }
+
+    @Override
+    public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
+        throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index 843f0bb..174f0ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -19,12 +19,18 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+
 /** Tests for {@link RecoveredInputChannel}. */
 public class RecoveredInputChannelTest {
 
@@ -38,6 +44,16 @@ public class RecoveredInputChannelTest {
         buildChannel().requestSubpartition(0);
     }
 
+    @Test(expected = CheckpointException.class)
+    public void testCheckpointStartImpossible() throws CheckpointException {
+        buildChannel()
+                .checkpointStarted(
+                        new CheckpointBarrier(
+                                0L,
+                                0L,
+                                new 
CheckpointOptions(CheckpointType.CHECKPOINT, getDefault())));
+    }
+
     private RecoveredInputChannel buildChannel() {
         try {
             return new RecoveredInputChannel(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 39a119f..924211c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -95,6 +95,16 @@ import static org.junit.Assert.fail;
 public class SingleInputGateTest extends InputGateTestBase {
 
     @Test(expected = CheckpointException.class)
+    public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws 
CheckpointException {
+        SingleInputGate gate =
+                createInputGate(createNettyShuffleEnvironment(), 1, 
ResultPartitionType.PIPELINED);
+        gate.setInputChannels(
+                new 
InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate));
+        gate.checkpointStarted(
+                new CheckpointBarrier(1L, 1L, new 
CheckpointOptions(CHECKPOINT, getDefault())));
+    }
+
+    @Test(expected = CheckpointException.class)
     public void testCheckpointsDeclinedUnlessStateConsumed() throws 
CheckpointException {
         SingleInputGate gate = 
createInputGate(createNettyShuffleEnvironment());
         checkState(!gate.getStateConsumedFuture().isDone());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index dc9e51a..4530bd5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -120,9 +120,8 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
             },
             new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
             new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            // todo: enable after completely  fixing FLINK-20654
-            //            new Object[] {"Parallel union, p = 5", 
createUnionSettings(5)},
-            //            new Object[] {"Parallel union, p = 10", 
createUnionSettings(10)},
+            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
+            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
         };
     }
 

Reply via email to