This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c6786ab  [FLINK-20654][checkpointing] Decline checkpoints until input 
channels are recovered
c6786ab is described below

commit c6786ab9cf7e40be41a5a9c12461d5e60a789195
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jan 7 16:27:21 2021 +0100

    [FLINK-20654][checkpointing] Decline checkpoints until input channels are 
recovered
    
    In regard to InputChannels, there are 3 cases when a checkpoint has to be 
declined:
    1. Channel state is not fully consumed
    2. Channel was not yet converted from Recovered to normal
    3. Channel was not yet converted from Unknown to normal
    
    In the 1st case, new checkpoint may skip some recovered buffers.
    In the 2nd and 3rd cases, not handling checkpointStarted() call
    by normal channels can prevent incoming buffers from being captured.
    In all these cases new checkpoint would be inconsistent.
---
 .../io/network/partition/consumer/IndexedInputGate.java        |  5 -----
 .../io/network/partition/consumer/RecoveredInputChannel.java   |  8 ++++++++
 .../io/network/partition/consumer/UnknownInputChannel.java     |  8 ++++++++
 .../network/partition/consumer/RecoveredInputChannelTest.java  | 10 ++++++++++
 .../io/network/partition/consumer/SingleInputGateTest.java     | 10 ++++++++++
 .../flink/test/checkpointing/UnalignedCheckpointITCase.java    |  5 ++---
 6 files changed, 38 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 686c015..be837f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
-
 /** An {@link InputGate} with a specific index. */
 public abstract class IndexedInputGate extends InputGate implements 
CheckpointableInput {
     /** Returns the index of this input gate. Only supported on */
@@ -32,9 +30,6 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
 
     @Override
     public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
-        if (!getStateConsumedFuture().isDone()) {
-            throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
-        }
         for (int index = 0, numChannels = getNumberOfInputChannels();
                 index < numChannels;
                 index++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
index 81d64a7..eef4437 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
@@ -20,9 +20,11 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
@@ -40,6 +42,7 @@ import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -239,4 +242,9 @@ public abstract class RecoveredInputChannel extends 
InputChannel implements Chan
         }
         return bufferManager.requestBufferBlocking();
     }
+
+    @Override
+    public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
+        throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 62aacbb..55b5dec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -34,6 +36,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -164,4 +167,9 @@ class UnknownInputChannel extends InputChannel implements 
ChannelStateHolder {
         Preconditions.checkState(this.channelStateWriter == null);
         this.channelStateWriter = channelStateWriter;
     }
+
+    @Override
+    public void checkpointStarted(CheckpointBarrier barrier) throws 
CheckpointException {
+        throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
index 843f0bb..f958f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java
@@ -19,12 +19,17 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+
 /** Tests for {@link RecoveredInputChannel}. */
 public class RecoveredInputChannelTest {
 
@@ -38,6 +43,11 @@ public class RecoveredInputChannelTest {
         buildChannel().requestSubpartition(0);
     }
 
+    @Test(expected = CheckpointException.class)
+    public void testCheckpointStartImpossible() throws CheckpointException {
+        buildChannel().checkpointStarted(new CheckpointBarrier(0L, 0L, 
unaligned(getDefault())));
+    }
+
     private RecoveredInputChannel buildChannel() {
         try {
             return new RecoveredInputChannel(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 68ccd00..55c4e5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -95,6 +95,16 @@ import static org.junit.Assert.fail;
 public class SingleInputGateTest extends InputGateTestBase {
 
     @Test(expected = CheckpointException.class)
+    public void testCheckpointsDeclinedUnlessAllChannelsAreKnown() throws 
CheckpointException {
+        SingleInputGate gate =
+                createInputGate(createNettyShuffleEnvironment(), 1, 
ResultPartitionType.PIPELINED);
+        gate.setInputChannels(
+                new 
InputChannelBuilder().setChannelIndex(0).buildUnknownChannel(gate));
+        gate.checkpointStarted(
+                new CheckpointBarrier(1L, 1L, alignedNoTimeout(CHECKPOINT, 
getDefault())));
+    }
+
+    @Test(expected = CheckpointException.class)
     public void testCheckpointsDeclinedUnlessStateConsumed() throws 
CheckpointException {
         SingleInputGate gate = 
createInputGate(createNettyShuffleEnvironment());
         checkState(!gate.getStateConsumedFuture().isDone());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 24027a6..b7be1e1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -124,9 +124,8 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
             },
             new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
             new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            // todo: enable after completely  fixing FLINK-20654
-            //            new Object[] {"Parallel union, p = 5", 
createUnionSettings(5)},
-            //            new Object[] {"Parallel union, p = 10", 
createUnionSettings(10)},
+            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
+            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
         };
     }
 

Reply via email to