This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1997b8  [FLINK-24667][runtime] Fix error handling in 
ChannelStateCheckpointWriter
d1997b8 is described below

commit d1997b827a0e21308c57450dd7a6df1e8efa5bce
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 29 09:31:10 2021 +0200

    [FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
    
    - Don't propagate single checkpoint failure to prevent exiting from
      ChannelStateWriteRequestExecutorImpl loop
    - Handle error during closing the stream
    - Swap suppressed exceptions to prevent them from piling up in the
      stacktrace
---
 .../channel/ChannelStateCheckpointWriter.java      |  34 ++-
 .../channel/ChannelStateWriteRequest.java          |   5 +-
 .../ChannelStateWriteRequestExecutorImpl.java      |   7 +-
 .../state/ttl/mock/MockKeyedStateBackend.java      |  17 +-
 .../ttl/mock/MockKeyedStateBackendBuilder.java     |  10 +-
 .../state/ttl/mock/MockOperatorStateBackend.java   | 100 +++++++
 .../runtime/state/ttl/mock/MockStateBackend.java   |  14 +-
 .../UnalignedCheckpointFailureHandlingITCase.java  | 313 +++++++++++++++++++++
 8 files changed, 474 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index 368bb04..f84cf50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -51,6 +51,8 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static java.util.UUID.randomUUID;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -99,8 +101,7 @@ class ChannelStateCheckpointWriter {
             ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
-            RunnableWithException onComplete)
-            throws Exception {
+            RunnableWithException onComplete) {
         this(
                 taskName,
                 subtaskIndex,
@@ -121,8 +122,7 @@ class ChannelStateCheckpointWriter {
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
-            DataOutputStream dataStream)
-            throws Exception {
+            DataOutputStream dataStream) {
         this.taskName = taskName;
         this.subtaskIndex = subtaskIndex;
         this.checkpointId = checkpointId;
@@ -134,7 +134,7 @@ class ChannelStateCheckpointWriter {
         runWithChecks(() -> serializer.writeHeader(dataStream));
     }
 
-    void writeInput(InputChannelInfo info, Buffer buffer) throws Exception {
+    void writeInput(InputChannelInfo info, Buffer buffer) {
         write(
                 inputChannelOffsets,
                 info,
@@ -143,7 +143,7 @@ class ChannelStateCheckpointWriter {
                 "ChannelStateCheckpointWriter#writeInput");
     }
 
-    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws 
Exception {
+    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
         write(
                 resultSubpartitionOffsets,
                 info,
@@ -157,8 +157,7 @@ class ChannelStateCheckpointWriter {
             K key,
             Buffer buffer,
             boolean precondition,
-            String action)
-            throws Exception {
+            String action) {
         try {
             if (result.isDone()) {
                 return;
@@ -290,19 +289,30 @@ class ChannelStateCheckpointWriter {
         }
     }
 
-    private void runWithChecks(RunnableWithException r) throws Exception {
+    private void runWithChecks(RunnableWithException r) {
         try {
             checkState(!result.isDone(), "result is already completed", 
result);
             r.run();
         } catch (Exception e) {
             fail(e);
-            throw e;
+            if (!findThrowable(e, IOException.class).isPresent()) {
+                rethrow(e);
+            }
         }
     }
 
-    public void fail(Throwable e) throws Exception {
+    public void fail(Throwable e) {
         result.fail(e);
-        checkpointStream.close();
+        try {
+            checkpointStream.close();
+        } catch (Exception closeException) {
+            String message = "Unable to close checkpointStream after a 
failure";
+            if (findThrowable(closeException, IOException.class).isPresent()) {
+                LOG.warn(message, closeException);
+            } else {
+                throw new RuntimeException(message, closeException);
+            }
+        }
     }
 
     private interface HandleFactory<I, H extends 
AbstractChannelStateHandle<I>> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index 17be93b..d8d1923 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -22,10 +22,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
@@ -76,8 +76,7 @@ interface ChannelStateWriteRequest {
             long checkpointId,
             String name,
             CloseableIterator<Buffer> iterator,
-            BiConsumerWithException<ChannelStateCheckpointWriter, Buffer, 
Exception>
-                    bufferConsumer) {
+            BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
         return new CheckpointInProgressRequest(
                 name,
                 checkpointId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index 08fb2d5..5c12a77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -149,8 +149,11 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
         // checking before is not enough because (check + enqueue) is not 
atomic
         if (wasClosed || !thread.isAlive()) {
             cleanupRequests();
-            throw ExceptionUtils.firstOrSuppressed(
-                    new IllegalStateException("not running"), thrown);
+            IllegalStateException exception = new IllegalStateException("not 
running");
+            if (thrown != null) {
+                exception.addSuppressed(thrown);
+            }
+            throw exception;
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 2f1aab4..da7e74c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -66,6 +66,9 @@ import java.util.stream.Stream;
 /** State backend which produces in memory mock state objects. */
 public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
+    /** Whether to create empty snapshot ({@link MockKeyedStateHandle} isn't 
recognized by JM). */
+    private final boolean emptySnapshot;
+
     private interface StateFactory {
         <N, SV, S extends State, IS extends S> IS createInternalState(
                 TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> 
stateDesc)
@@ -105,7 +108,8 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             Map<String, Map<K, Map<Object, Object>>> stateValues,
             Map<String, StateSnapshotTransformer<Object>> stateSnapshotFilters,
             CloseableRegistry cancelStreamRegistry,
-            InternalKeyContext<K> keyContext) {
+            InternalKeyContext<K> keyContext,
+            boolean emptySnapshot) {
         super(
                 kvStateRegistry,
                 keySerializer,
@@ -117,6 +121,7 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 keyContext);
         this.stateValues = stateValues;
         this.stateSnapshotFilters = stateSnapshotFilters;
+        this.emptySnapshot = emptySnapshot;
     }
 
     @Override
@@ -221,9 +226,11 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             @Nonnull CheckpointOptions checkpointOptions) {
         return new FutureTask<>(
                 () ->
-                        SnapshotResult.of(
-                                new MockKeyedStateHandle<>(
-                                        copy(stateValues, 
stateSnapshotFilters))));
+                        emptySnapshot
+                                ? SnapshotResult.empty()
+                                : SnapshotResult.of(
+                                        new MockKeyedStateHandle<>(
+                                                copy(stateValues, 
stateSnapshotFilters))));
     }
 
     @Nonnull
@@ -309,7 +316,7 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
         @Override
         public long getStateSize() {
-            throw new UnsupportedOperationException();
+            return 0; // state size unknown
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
index 4d76286..a801aa9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
@@ -43,6 +43,9 @@ import java.util.Map;
  * @param <K> The data type that the key serializer serializes.
  */
 public class MockKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBuilder<K> {
+
+    private final boolean emptySnapshot;
+
     public MockKeyedStateBackendBuilder(
             TaskKvStateRegistry kvStateRegistry,
             TypeSerializer<K> keySerializer,
@@ -54,7 +57,8 @@ public class MockKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
             LatencyTrackingStateConfig latencyTrackingStateConfig,
             @Nonnull Collection<KeyedStateHandle> stateHandles,
             StreamCompressionDecorator keyGroupCompressionDecorator,
-            CloseableRegistry cancelStreamRegistry) {
+            CloseableRegistry cancelStreamRegistry,
+            boolean emptySnapshot) {
         super(
                 kvStateRegistry,
                 keySerializer,
@@ -67,6 +71,7 @@ public class MockKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
                 stateHandles,
                 keyGroupCompressionDecorator,
                 cancelStreamRegistry);
+        this.emptySnapshot = emptySnapshot;
     }
 
     @Override
@@ -86,6 +91,7 @@ public class MockKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBu
                 stateValues,
                 stateSnapshotFilters,
                 cancelStreamRegistry,
-                new InternalKeyContextImpl<>(keyGroupRange, 
numberOfKeyGroups));
+                new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups),
+                emptySnapshot);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
new file mode 100644
index 0000000..046b7a6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+class MockOperatorStateBackend implements OperatorStateBackend {
+
+    private final HashSet<String> registeredStateNames = new HashSet<>();
+    private final boolean emptySnapshot;
+
+    public MockOperatorStateBackend(boolean emptySnapshot) {
+        this.emptySnapshot = emptySnapshot;
+    }
+
+    @Override
+    public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
+        registeredStateNames.add(stateDescriptor.getName());
+        ListState<S> state =
+                MockInternalListState.createState(
+                        stateDescriptor.getElementSerializer(), 
stateDescriptor);
+        ((MockInternalKvState) state).values = HashMap::new;
+        return state;
+    }
+
+    @Override
+    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor)
+            throws Exception {
+        return getListState(stateDescriptor);
+    }
+
+    @Override
+    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, 
V> stateDescriptor)
+            throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Set<String> getRegisteredStateNames() {
+        return registeredStateNames;
+    }
+
+    @Override
+    public Set<String> getRegisteredBroadcastStateNames() {
+        return Collections.emptySet();
+    }
+
+    @Nonnull
+    @Override
+    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions)
+            throws Exception {
+        if (!emptySnapshot) {
+            throw new UnsupportedOperationException();
+        }
+        return new FutureTask<>(SnapshotResult::empty);
+    }
+
+    @Override
+    public void dispose() {}
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index 3ad4bfc..d94644c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -42,6 +42,15 @@ import java.util.Collection;
 /** mack state backend. */
 public class MockStateBackend extends AbstractStateBackend {
     private static final long serialVersionUID = 995676510267499393L;
+    private final boolean emptySnapshot;
+
+    public MockStateBackend() {
+        this(false);
+    }
+
+    public MockStateBackend(boolean emptySnapshot) {
+        this.emptySnapshot = emptySnapshot;
+    }
 
     @Override
     public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
@@ -67,7 +76,8 @@ public class MockStateBackend extends AbstractStateBackend {
                         LatencyTrackingStateConfig.disabled(),
                         stateHandles,
                         
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
-                        cancelStreamRegistry)
+                        cancelStreamRegistry,
+                        emptySnapshot)
                 .build();
     }
 
@@ -77,6 +87,6 @@ public class MockStateBackend extends AbstractStateBackend {
             String operatorIdentifier,
             @Nonnull Collection<OperatorStateHandle> stateHandles,
             CloseableRegistry cancelStreamRegistry) {
-        throw new UnsupportedOperationException();
+        return new MockOperatorStateBackend(emptySnapshot);
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
new file mode 100644
index 0000000..15252c7
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.api.common.time.Deadline.fromNow;
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * Tests failure handling in channel state persistence.
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-24667";>FLINK-24667</a>
+ */
+public class UnalignedCheckpointFailureHandlingITCase {
+
+    private static final int PARALLELISM = 2;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(PARALLELISM)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    @Test
+    public void testCheckpointSuccessAfterFailure() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        TestCheckpointStorage storage =
+                new TestCheckpointStorage(
+                        new JobManagerCheckpointStorage(), sharedObjects, 
temporaryFolder);
+
+        configure(env, storage);
+        buildGraph(env);
+
+        JobClient jobClient = env.executeAsync();
+        JobID jobID = jobClient.getJobID();
+        MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+        waitForJobStatus(jobClient, singletonList(RUNNING), 
fromNow(Duration.ofSeconds(30)));
+        waitForAllTaskRunning(miniCluster, jobID, false);
+
+        triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+
+        miniCluster.triggerCheckpoint(jobID).get();
+    }
+
+    private void configure(StreamExecutionEnvironment env, 
TestCheckpointStorage storage) {
+        // enable checkpointing but only via API
+        env.enableCheckpointing(Long.MAX_VALUE, 
CheckpointingMode.EXACTLY_ONCE);
+
+        env.getCheckpointConfig().setCheckpointStorage(storage);
+
+        // use non-snapshotting backend to test channel state persistence 
integration with
+        // checkpoint storage
+        env.setStateBackend(new MockStateBackend(true));
+
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+
+        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO); 
// speed-up
+
+        // failures are emitted by the storage
+        
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
+
+        // DoP > 1 is required for some barriers to lag
+        env.setParallelism(PARALLELISM);
+
+        // no chaining to have input channels (doesn't matter local or remote)
+        env.disableOperatorChaining();
+    }
+
+    private void buildGraph(StreamExecutionEnvironment env) {
+        // with zero alignment timeout some steps here are not strictly 
necessary currently, but
+        // this may change in the future
+        env.fromSource(
+                        new NumberSequenceSource(0, Long.MAX_VALUE),
+                        WatermarkStrategy.noWatermarks(),
+                        "num-source")
+                // source is not parallel, so keyBy to send to all down-streams
+                .keyBy(value -> value)
+                // exert back-pressure
+                .map(
+                        value -> {
+                            Thread.sleep(1);
+                            return value;
+                        })
+                .addSink(new DiscardingSink<>());
+    }
+
+    private void triggerFailingCheckpoint(
+            JobID jobID, Class<TestException> expectedException, MiniCluster 
miniCluster)
+            throws InterruptedException, ExecutionException {
+        while (true) {
+            Optional<Throwable> cpFailure =
+                    miniCluster
+                            .triggerCheckpoint(jobID)
+                            .thenApply(ign -> Optional.empty())
+                            .handle((ign, err) -> Optional.ofNullable(err))
+                            .get();
+            if (!cpFailure.isPresent()) {
+                Thread.sleep(50); // trigger again - in case of no channel 
data was written
+            } else if (isCausedBy(cpFailure.get(), expectedException)) {
+                return;
+            } else {
+                rethrow(cpFailure.get());
+            }
+        }
+    }
+
+    private boolean isCausedBy(Throwable t, Class<TestException> 
expectedException) {
+        return findThrowable(t, SerializedThrowable.class)
+                .flatMap(
+                        st -> {
+                            Throwable deser = 
st.deserializeError(getClass().getClassLoader());
+                            return findThrowable(deser, expectedException);
+                        })
+                .isPresent();
+    }
+
+    private static class TestCheckpointStorage implements CheckpointStorage {
+        private final CheckpointStorage delegate;
+        private final SharedReference<AtomicBoolean> failOnCloseRef;
+        private final SharedReference<TemporaryFolder> tempFolderRef;
+
+        private TestCheckpointStorage(
+                CheckpointStorage delegate,
+                SharedObjects sharedObjects,
+                TemporaryFolder tempFolder) {
+            this.delegate = delegate;
+            this.failOnCloseRef = sharedObjects.add(new AtomicBoolean(true));
+            this.tempFolderRef = sharedObjects.add(tempFolder);
+        }
+
+        @Override
+        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) 
throws IOException {
+            return new TestCheckpointStorageAccess(
+                    delegate.createCheckpointStorage(jobId),
+                    failOnCloseRef.get(),
+                    tempFolderRef.get().newFolder());
+        }
+
+        @Override
+        public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer)
+                throws IOException {
+            return delegate.resolveCheckpoint(externalPointer);
+        }
+    }
+
+    private static class TestCheckpointStorageAccess implements 
CheckpointStorageAccess {
+        private final CheckpointStorageAccess delegate;
+        private final AtomicBoolean failOnClose;
+        private final File path;
+
+        public TestCheckpointStorageAccess(
+                CheckpointStorageAccess delegate, AtomicBoolean failOnClose, 
File file) {
+            this.delegate = delegate;
+            this.failOnClose = failOnClose;
+            this.path = file;
+        }
+
+        @Override
+        public CheckpointStreamFactory resolveCheckpointStorageLocation(
+                long checkpointId, CheckpointStorageLocationReference 
reference) {
+            return ign -> new FailingOnceFsCheckpointOutputStream(path, 100, 
0, failOnClose);
+        }
+
+        @Override
+        public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream()
+                throws IOException {
+            return delegate.createTaskOwnedStateStream();
+        }
+
+        @Override
+        public boolean supportsHighlyAvailableStorage() {
+            return delegate.supportsHighlyAvailableStorage();
+        }
+
+        @Override
+        public boolean hasDefaultSavepointLocation() {
+            return delegate.hasDefaultSavepointLocation();
+        }
+
+        @Override
+        public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer)
+                throws IOException {
+            return delegate.resolveCheckpoint(externalPointer);
+        }
+
+        @Override
+        public void initializeBaseLocationsForCheckpoint() throws IOException {
+            delegate.initializeBaseLocationsForCheckpoint();
+        }
+
+        @Override
+        public CheckpointStorageLocation initializeLocationForCheckpoint(long 
checkpointId)
+                throws IOException {
+            return delegate.initializeLocationForCheckpoint(checkpointId);
+        }
+
+        @Override
+        public CheckpointStorageLocation initializeLocationForSavepoint(
+                long checkpointId, @Nullable String externalLocationPointer) 
throws IOException {
+            return delegate.initializeLocationForSavepoint(checkpointId, 
externalLocationPointer);
+        }
+    }
+
+    private static class FailingOnceFsCheckpointOutputStream extends 
FsCheckpointStateOutputStream {
+        private final AtomicBoolean failOnClose;
+        private volatile boolean failedCloseAndGetHandle = false;
+
+        public FailingOnceFsCheckpointOutputStream(
+                File path, int bufferSize, int localStateThreshold, 
AtomicBoolean failOnClose)
+                throws IOException {
+            super(
+                    fromLocalFile(path.getAbsoluteFile()),
+                    FileSystem.get(path.toURI()),
+                    bufferSize,
+                    localStateThreshold);
+            this.failOnClose = failOnClose;
+        }
+
+        // called on write success
+        @Override
+        public StreamStateHandle closeAndGetHandle() throws IOException {
+            if (failOnClose.get()) {
+                failedCloseAndGetHandle = true;
+                throw new TestException("failure from closeAndGetHandle");
+            } else {
+                return super.closeAndGetHandle();
+            }
+        }
+
+        // called on no data and on failure (in particular of 
closeAndGetHandle)
+        @Override
+        public void close() {
+            if (failedCloseAndGetHandle && failOnClose.compareAndSet(true, 
false)) {
+                // the contract does allow IO exceptions to be thrown from 
close(),
+                // but FsCheckpointStateOutputStream catches everything, which 
seems risky to change
+                rethrow(new TestException("failure from close"));
+            } else {
+                super.close();
+            }
+        }
+    }
+
+    private static class TestException extends IOException {
+        public TestException(String message) {
+            super(message);
+        }
+    }
+}

Reply via email to