This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1997b8 [FLINK-24667][runtime] Fix error handling in
ChannelStateCheckpointWriter
d1997b8 is described below
commit d1997b827a0e21308c57450dd7a6df1e8efa5bce
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 29 09:31:10 2021 +0200
[FLINK-24667][runtime] Fix error handling in ChannelStateCheckpointWriter
- Don't propagate single checkpoint failure to prevent exiting from
ChannelStateWriteRequestExecutorImpl loop
- Handle error during closing the stream
- Swap suppressed exceptions to prevent them from piling up in the
stacktrace
---
.../channel/ChannelStateCheckpointWriter.java | 34 ++-
.../channel/ChannelStateWriteRequest.java | 5 +-
.../ChannelStateWriteRequestExecutorImpl.java | 7 +-
.../state/ttl/mock/MockKeyedStateBackend.java | 17 +-
.../ttl/mock/MockKeyedStateBackendBuilder.java | 10 +-
.../state/ttl/mock/MockOperatorStateBackend.java | 100 +++++++
.../runtime/state/ttl/mock/MockStateBackend.java | 14 +-
.../UnalignedCheckpointFailureHandlingITCase.java | 313 +++++++++++++++++++++
8 files changed, 474 insertions(+), 26 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index 368bb04..f84cf50 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -51,6 +51,8 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -99,8 +101,7 @@ class ChannelStateCheckpointWriter {
ChannelStateWriteResult result,
CheckpointStateOutputStream stream,
ChannelStateSerializer serializer,
- RunnableWithException onComplete)
- throws Exception {
+ RunnableWithException onComplete) {
this(
taskName,
subtaskIndex,
@@ -121,8 +122,7 @@ class ChannelStateCheckpointWriter {
ChannelStateSerializer serializer,
RunnableWithException onComplete,
CheckpointStateOutputStream checkpointStateOutputStream,
- DataOutputStream dataStream)
- throws Exception {
+ DataOutputStream dataStream) {
this.taskName = taskName;
this.subtaskIndex = subtaskIndex;
this.checkpointId = checkpointId;
@@ -134,7 +134,7 @@ class ChannelStateCheckpointWriter {
runWithChecks(() -> serializer.writeHeader(dataStream));
}
- void writeInput(InputChannelInfo info, Buffer buffer) throws Exception {
+ void writeInput(InputChannelInfo info, Buffer buffer) {
write(
inputChannelOffsets,
info,
@@ -143,7 +143,7 @@ class ChannelStateCheckpointWriter {
"ChannelStateCheckpointWriter#writeInput");
}
- void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws
Exception {
+ void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
write(
resultSubpartitionOffsets,
info,
@@ -157,8 +157,7 @@ class ChannelStateCheckpointWriter {
K key,
Buffer buffer,
boolean precondition,
- String action)
- throws Exception {
+ String action) {
try {
if (result.isDone()) {
return;
@@ -290,19 +289,30 @@ class ChannelStateCheckpointWriter {
}
}
- private void runWithChecks(RunnableWithException r) throws Exception {
+ private void runWithChecks(RunnableWithException r) {
try {
checkState(!result.isDone(), "result is already completed",
result);
r.run();
} catch (Exception e) {
fail(e);
- throw e;
+ if (!findThrowable(e, IOException.class).isPresent()) {
+ rethrow(e);
+ }
}
}
- public void fail(Throwable e) throws Exception {
+ public void fail(Throwable e) {
result.fail(e);
- checkpointStream.close();
+ try {
+ checkpointStream.close();
+ } catch (Exception closeException) {
+ String message = "Unable to close checkpointStream after a
failure";
+ if (findThrowable(closeException, IOException.class).isPresent()) {
+ LOG.warn(message, closeException);
+ } else {
+ throw new RuntimeException(message, closeException);
+ }
+ }
}
private interface HandleFactory<I, H extends
AbstractChannelStateHandle<I>> {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index 17be93b..d8d1923 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -22,10 +22,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import static
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.CANCELLED;
import static
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.COMPLETED;
@@ -76,8 +76,7 @@ interface ChannelStateWriteRequest {
long checkpointId,
String name,
CloseableIterator<Buffer> iterator,
- BiConsumerWithException<ChannelStateCheckpointWriter, Buffer,
Exception>
- bufferConsumer) {
+ BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
return new CheckpointInProgressRequest(
name,
checkpointId,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index 08fb2d5..5c12a77 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -149,8 +149,11 @@ class ChannelStateWriteRequestExecutorImpl implements
ChannelStateWriteRequestEx
// checking before is not enough because (check + enqueue) is not
atomic
if (wasClosed || !thread.isAlive()) {
cleanupRequests();
- throw ExceptionUtils.firstOrSuppressed(
- new IllegalStateException("not running"), thrown);
+ IllegalStateException exception = new IllegalStateException("not
running");
+ if (thrown != null) {
+ exception.addSuppressed(thrown);
+ }
+ throw exception;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 2f1aab4..da7e74c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -66,6 +66,9 @@ import java.util.stream.Stream;
/** State backend which produces in memory mock state objects. */
public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+ /** Whether to create empty snapshot ({@link MockKeyedStateHandle} isn't
recognized by JM). */
+ private final boolean emptySnapshot;
+
private interface StateFactory {
<N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV>
stateDesc)
@@ -105,7 +108,8 @@ public class MockKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
Map<String, Map<K, Map<Object, Object>>> stateValues,
Map<String, StateSnapshotTransformer<Object>> stateSnapshotFilters,
CloseableRegistry cancelStreamRegistry,
- InternalKeyContext<K> keyContext) {
+ InternalKeyContext<K> keyContext,
+ boolean emptySnapshot) {
super(
kvStateRegistry,
keySerializer,
@@ -117,6 +121,7 @@ public class MockKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
keyContext);
this.stateValues = stateValues;
this.stateSnapshotFilters = stateSnapshotFilters;
+ this.emptySnapshot = emptySnapshot;
}
@Override
@@ -221,9 +226,11 @@ public class MockKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
@Nonnull CheckpointOptions checkpointOptions) {
return new FutureTask<>(
() ->
- SnapshotResult.of(
- new MockKeyedStateHandle<>(
- copy(stateValues,
stateSnapshotFilters))));
+ emptySnapshot
+ ? SnapshotResult.empty()
+ : SnapshotResult.of(
+ new MockKeyedStateHandle<>(
+ copy(stateValues,
stateSnapshotFilters))));
}
@Nonnull
@@ -309,7 +316,7 @@ public class MockKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K> {
@Override
public long getStateSize() {
- throw new UnsupportedOperationException();
+ return 0; // state size unknown
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
index 4d76286..a801aa9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackendBuilder.java
@@ -43,6 +43,9 @@ import java.util.Map;
* @param <K> The data type that the key serializer serializes.
*/
public class MockKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBuilder<K> {
+
+ private final boolean emptySnapshot;
+
public MockKeyedStateBackendBuilder(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
@@ -54,7 +57,8 @@ public class MockKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBu
LatencyTrackingStateConfig latencyTrackingStateConfig,
@Nonnull Collection<KeyedStateHandle> stateHandles,
StreamCompressionDecorator keyGroupCompressionDecorator,
- CloseableRegistry cancelStreamRegistry) {
+ CloseableRegistry cancelStreamRegistry,
+ boolean emptySnapshot) {
super(
kvStateRegistry,
keySerializer,
@@ -67,6 +71,7 @@ public class MockKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBu
stateHandles,
keyGroupCompressionDecorator,
cancelStreamRegistry);
+ this.emptySnapshot = emptySnapshot;
}
@Override
@@ -86,6 +91,7 @@ public class MockKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBackendBu
stateValues,
stateSnapshotFilters,
cancelStreamRegistry,
- new InternalKeyContextImpl<>(keyGroupRange,
numberOfKeyGroups));
+ new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups),
+ emptySnapshot);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
new file mode 100644
index 0000000..046b7a6
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockOperatorStateBackend.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+class MockOperatorStateBackend implements OperatorStateBackend {
+
+ private final HashSet<String> registeredStateNames = new HashSet<>();
+ private final boolean emptySnapshot;
+
+ public MockOperatorStateBackend(boolean emptySnapshot) {
+ this.emptySnapshot = emptySnapshot;
+ }
+
+ @Override
+ public <S> ListState<S> getListState(ListStateDescriptor<S>
stateDescriptor) throws Exception {
+ registeredStateNames.add(stateDescriptor.getName());
+ ListState<S> state =
+ MockInternalListState.createState(
+ stateDescriptor.getElementSerializer(),
stateDescriptor);
+ ((MockInternalKvState) state).values = HashMap::new;
+ return state;
+ }
+
+ @Override
+ public <S> ListState<S> getUnionListState(ListStateDescriptor<S>
stateDescriptor)
+ throws Exception {
+ return getListState(stateDescriptor);
+ }
+
+ @Override
+ public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K,
V> stateDescriptor)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ return registeredStateNames;
+ }
+
+ @Override
+ public Set<String> getRegisteredBroadcastStateNames() {
+ return Collections.emptySet();
+ }
+
+ @Nonnull
+ @Override
+ public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
+ long checkpointId,
+ long timestamp,
+ @Nonnull CheckpointStreamFactory streamFactory,
+ @Nonnull CheckpointOptions checkpointOptions)
+ throws Exception {
+ if (!emptySnapshot) {
+ throw new UnsupportedOperationException();
+ }
+ return new FutureTask<>(SnapshotResult::empty);
+ }
+
+ @Override
+ public void dispose() {}
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index 3ad4bfc..d94644c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -42,6 +42,15 @@ import java.util.Collection;
/** mack state backend. */
public class MockStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 995676510267499393L;
+ private final boolean emptySnapshot;
+
+ public MockStateBackend() {
+ this(false);
+ }
+
+ public MockStateBackend(boolean emptySnapshot) {
+ this.emptySnapshot = emptySnapshot;
+ }
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
@@ -67,7 +76,8 @@ public class MockStateBackend extends AbstractStateBackend {
LatencyTrackingStateConfig.disabled(),
stateHandles,
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
- cancelStreamRegistry)
+ cancelStreamRegistry,
+ emptySnapshot)
.build();
}
@@ -77,6 +87,6 @@ public class MockStateBackend extends AbstractStateBackend {
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) {
- throw new UnsupportedOperationException();
+ return new MockOperatorStateBackend(emptySnapshot);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
new file mode 100644
index 0000000..15252c7
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.api.common.time.Deadline.fromNow;
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.rethrow;
+
+/**
+ * Tests failure handling in channel state persistence.
+ *
+ * @see <a
href="https://issues.apache.org/jira/browse/FLINK-24667">FLINK-24667</a>
+ */
+public class UnalignedCheckpointFailureHandlingITCase {
+
+ private static final int PARALLELISM = 2;
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(PARALLELISM)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Test
+ public void testCheckpointSuccessAfterFailure() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ TestCheckpointStorage storage =
+ new TestCheckpointStorage(
+ new JobManagerCheckpointStorage(), sharedObjects,
temporaryFolder);
+
+ configure(env, storage);
+ buildGraph(env);
+
+ JobClient jobClient = env.executeAsync();
+ JobID jobID = jobClient.getJobID();
+ MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+ waitForJobStatus(jobClient, singletonList(RUNNING),
fromNow(Duration.ofSeconds(30)));
+ waitForAllTaskRunning(miniCluster, jobID, false);
+
+ triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+
+ miniCluster.triggerCheckpoint(jobID).get();
+ }
+
+ private void configure(StreamExecutionEnvironment env,
TestCheckpointStorage storage) {
+ // enable checkpointing but only via API
+ env.enableCheckpointing(Long.MAX_VALUE,
CheckpointingMode.EXACTLY_ONCE);
+
+ env.getCheckpointConfig().setCheckpointStorage(storage);
+
+ // use non-snapshotting backend to test channel state persistence
integration with
+ // checkpoint storage
+ env.setStateBackend(new MockStateBackend(true));
+
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+
+ env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
// speed-up
+
+ // failures are emitted by the storage
+
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
+
+ // DoP > 1 is required for some barriers to lag
+ env.setParallelism(PARALLELISM);
+
+ // no chaining to have input channels (doesn't matter local or remote)
+ env.disableOperatorChaining();
+ }
+
+ private void buildGraph(StreamExecutionEnvironment env) {
+ // with zero alignment timeout some steps here are not strictly
necessary currently, but
+ // this may change in the future
+ env.fromSource(
+ new NumberSequenceSource(0, Long.MAX_VALUE),
+ WatermarkStrategy.noWatermarks(),
+ "num-source")
+ // source is not parallel, so keyBy to send to all down-streams
+ .keyBy(value -> value)
+ // exert back-pressure
+ .map(
+ value -> {
+ Thread.sleep(1);
+ return value;
+ })
+ .addSink(new DiscardingSink<>());
+ }
+
+ private void triggerFailingCheckpoint(
+ JobID jobID, Class<TestException> expectedException, MiniCluster
miniCluster)
+ throws InterruptedException, ExecutionException {
+ while (true) {
+ Optional<Throwable> cpFailure =
+ miniCluster
+ .triggerCheckpoint(jobID)
+ .thenApply(ign -> Optional.empty())
+ .handle((ign, err) -> Optional.ofNullable(err))
+ .get();
+ if (!cpFailure.isPresent()) {
+ Thread.sleep(50); // trigger again - in case of no channel
data was written
+ } else if (isCausedBy(cpFailure.get(), expectedException)) {
+ return;
+ } else {
+ rethrow(cpFailure.get());
+ }
+ }
+ }
+
+ private boolean isCausedBy(Throwable t, Class<TestException>
expectedException) {
+ return findThrowable(t, SerializedThrowable.class)
+ .flatMap(
+ st -> {
+ Throwable deser =
st.deserializeError(getClass().getClassLoader());
+ return findThrowable(deser, expectedException);
+ })
+ .isPresent();
+ }
+
+ private static class TestCheckpointStorage implements CheckpointStorage {
+ private final CheckpointStorage delegate;
+ private final SharedReference<AtomicBoolean> failOnCloseRef;
+ private final SharedReference<TemporaryFolder> tempFolderRef;
+
+ private TestCheckpointStorage(
+ CheckpointStorage delegate,
+ SharedObjects sharedObjects,
+ TemporaryFolder tempFolder) {
+ this.delegate = delegate;
+ this.failOnCloseRef = sharedObjects.add(new AtomicBoolean(true));
+ this.tempFolderRef = sharedObjects.add(tempFolder);
+ }
+
+ @Override
+ public CheckpointStorageAccess createCheckpointStorage(JobID jobId)
throws IOException {
+ return new TestCheckpointStorageAccess(
+ delegate.createCheckpointStorage(jobId),
+ failOnCloseRef.get(),
+ tempFolderRef.get().newFolder());
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String
externalPointer)
+ throws IOException {
+ return delegate.resolveCheckpoint(externalPointer);
+ }
+ }
+
+ private static class TestCheckpointStorageAccess implements
CheckpointStorageAccess {
+ private final CheckpointStorageAccess delegate;
+ private final AtomicBoolean failOnClose;
+ private final File path;
+
+ public TestCheckpointStorageAccess(
+ CheckpointStorageAccess delegate, AtomicBoolean failOnClose,
File file) {
+ this.delegate = delegate;
+ this.failOnClose = failOnClose;
+ this.path = file;
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(
+ long checkpointId, CheckpointStorageLocationReference
reference) {
+ return ign -> new FailingOnceFsCheckpointOutputStream(path, 100,
0, failOnClose);
+ }
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream
createTaskOwnedStateStream()
+ throws IOException {
+ return delegate.createTaskOwnedStateStream();
+ }
+
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return delegate.supportsHighlyAvailableStorage();
+ }
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ return delegate.hasDefaultSavepointLocation();
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String
externalPointer)
+ throws IOException {
+ return delegate.resolveCheckpoint(externalPointer);
+ }
+
+ @Override
+ public void initializeBaseLocationsForCheckpoint() throws IOException {
+ delegate.initializeBaseLocationsForCheckpoint();
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long
checkpointId)
+ throws IOException {
+ return delegate.initializeLocationForCheckpoint(checkpointId);
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForSavepoint(
+ long checkpointId, @Nullable String externalLocationPointer)
throws IOException {
+ return delegate.initializeLocationForSavepoint(checkpointId,
externalLocationPointer);
+ }
+ }
+
+ private static class FailingOnceFsCheckpointOutputStream extends
FsCheckpointStateOutputStream {
+ private final AtomicBoolean failOnClose;
+ private volatile boolean failedCloseAndGetHandle = false;
+
+ public FailingOnceFsCheckpointOutputStream(
+ File path, int bufferSize, int localStateThreshold,
AtomicBoolean failOnClose)
+ throws IOException {
+ super(
+ fromLocalFile(path.getAbsoluteFile()),
+ FileSystem.get(path.toURI()),
+ bufferSize,
+ localStateThreshold);
+ this.failOnClose = failOnClose;
+ }
+
+ // called on write success
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ if (failOnClose.get()) {
+ failedCloseAndGetHandle = true;
+ throw new TestException("failure from closeAndGetHandle");
+ } else {
+ return super.closeAndGetHandle();
+ }
+ }
+
+ // called on no data and on failure (in particular of
closeAndGetHandle)
+ @Override
+ public void close() {
+ if (failedCloseAndGetHandle && failOnClose.compareAndSet(true,
false)) {
+ // the contract does allow IO exceptions to be thrown from
close(),
+ // but FsCheckpointStateOutputStream catches everything, which
seems risky to change
+ rethrow(new TestException("failure from close"));
+ } else {
+ super.close();
+ }
+ }
+ }
+
+ private static class TestException extends IOException {
+ public TestException(String message) {
+ super(message);
+ }
+ }
+}