Repository: flink Updated Branches: refs/heads/release-1.1 388acbca9 -> 020da2ce1
[FLINK-5229] [tm] Discard completed checkpoints if a subsequent operator's checkpoint fails In case of a failure of any StreamOperator#snapshotState method, all up to this point created StreamTaskStates are discarded. This ensures that a failing checkpoint operation of a chained operator won't leave orphaned checkpoint data behind. This closes #2924. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/020da2ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/020da2ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/020da2ce Branch: refs/heads/release-1.1 Commit: 020da2ce1c8be83789252d0db959896a761d7513 Parents: 388acbc Author: Till Rohrmann <[email protected]> Authored: Fri Dec 2 15:33:06 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Dec 2 19:10:46 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 214 ++++++++++++------- .../org/apache/flink/util/ExceptionUtils.java | 50 +++++ .../runtime/checkpoint/PendingCheckpoint.java | 14 +- .../state/AsynchronousKvStateSnapshot.java | 5 - .../runtime/state/AsynchronousStateHandle.java | 5 - .../streaming/runtime/tasks/StreamTask.java | 69 ++++-- .../runtime/tasks/StreamTaskState.java | 28 ++- .../tasks/StreamTaskAsyncCheckpointTest.java | 5 + .../streaming/runtime/tasks/StreamTaskTest.java | 105 +++++++++ 9 files changed, 377 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 1561afc..3d75bde 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -77,6 +77,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -638,11 +639,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { */ private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> { private static final long serialVersionUID = 1L; + + private final SerializableObject lock = new SerializableObject(); private final File localBackupPath; private final URI backupUri; private final List<StateDescriptor> stateDescriptors; private final long checkpointId; + private volatile boolean discarded; + private SemiAsyncSnapshot(File localBackupPath, URI backupUri, List<StateDescriptor> columnFamilies, @@ -651,22 +656,45 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.backupUri = backupUri; this.stateDescriptors = columnFamilies; this.checkpointId = checkpointId; + this.discarded = false; } @Override public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception { - try { - long startTime = System.currentTimeMillis(); - HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); - long endTime = System.currentTimeMillis(); - LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms."); - return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors); - } catch (Exception e) { - FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); - fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); - throw e; - } finally { - FileUtils.deleteQuietly(localBackupPath); + synchronized (lock) { + if (discarded) { + throw new Exception("The SemiAsyncSnapshot has already been discarded."); + } else { + try { + long startTime = System.currentTimeMillis(); + HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); + long endTime = System.currentTimeMillis(); + + LOG.info("RocksDB materialization from {} to {} (asynchronous part) took {} ms.", localBackupPath, backupUri, (endTime - startTime)); + + return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors); + } catch (Exception e) { + FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration()); + fs.delete(new org.apache.hadoop.fs.Path(backupUri), true); + throw e; + } finally { + discardState(); + } + } + } + } + + @Override + public void discardState() throws Exception { + if (!discarded) { + synchronized (lock) { + if (!discarded) { + discarded = true; + if (!FileUtils.deleteQuietly(localBackupPath)) { + LOG.warn("Could not delete the local backup file stored at {}.", localBackupPath); + } + } + } } } } @@ -732,10 +760,13 @@ public class RocksDBStateBackend extends AbstractStateBackend { private transient org.rocksdb.Snapshot snapshot; private transient AbstractStateBackend backend; + private final SerializableObject lock = new SerializableObject(); private final URI backupUri; private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies; private final long checkpointId; + private volatile boolean discarded; + private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot, AbstractStateBackend backend, URI backupUri, @@ -746,99 +777,122 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.backupUri = backupUri; this.columnFamilies = columnFamilies; this.checkpointId = checkpointId; + this.discarded = false; } @Override public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception { - long startTime = System.currentTimeMillis(); - CheckpointStateOutputView outputView; - - try { - try { - outputView = backend.createCheckpointStateOutputView(checkpointId, startTime); - } catch (Exception e) { - throw new Exception("Could not create a checkpoint state output view to " + - "materialize the checkpoint data into.", e); - } - - try { - outputView.writeInt(columnFamilies.size()); + synchronized (lock) { + if (discarded) { + throw new Exception("FullyAsyncSnapshot has already been discarded."); + } else { + long startTime = System.currentTimeMillis(); + CheckpointStateOutputView outputView; - // we don't know how many key/value pairs there are in each column family. - // We prefix every written element with a byte that signifies to which - // column family it belongs, this way we can restore the column families - byte count = 0; - Map<String, Byte> columnFamilyMapping = new HashMap<>(); - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) { - columnFamilyMapping.put(column.getKey(), count); + try { + try { + outputView = backend.createCheckpointStateOutputView(checkpointId, startTime); + } catch (Exception e) { + throw new Exception("Could not create a checkpoint state output view to " + + "materialize the checkpoint data into.", e); + } - outputView.writeByte(count); + try { + outputView.writeInt(columnFamilies.size()); - ObjectOutputStream ooOut = new ObjectOutputStream(outputView); - ooOut.writeObject(column.getValue().f1); - ooOut.flush(); + // we don't know how many key/value pairs there are in each column family. + // We prefix every written element with a byte that signifies to which + // column family it belongs, this way we can restore the column families + byte count = 0; + Map<String, Byte> columnFamilyMapping = new HashMap<>(); + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) { + columnFamilyMapping.put(column.getKey(), count); - count++; - } + outputView.writeByte(count); - ReadOptions readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); + ObjectOutputStream ooOut = new ObjectOutputStream(outputView); + ooOut.writeObject(column.getValue().f1); + ooOut.flush(); - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) { - byte columnByte = columnFamilyMapping.get(column.getKey()); + count++; + } - synchronized (dbCleanupLock) { - if (db == null) { - throw new RuntimeException("RocksDB instance was disposed. This happens " + - "when we are in the middle of a checkpoint and the job fails."); + ReadOptions readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : columnFamilies.entrySet()) { + byte columnByte = columnFamilyMapping.get(column.getKey()); + + synchronized (dbCleanupLock) { + if (db == null) { + throw new RuntimeException("RocksDB instance was disposed. This happens " + + "when we are in the middle of a checkpoint and the job fails."); + } + RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); + iterator.seekToFirst(); + while (iterator.isValid()) { + outputView.writeByte(columnByte); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), + outputView); + BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), + outputView); + iterator.next(); + } + } } - RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions); - iterator.seekToFirst(); - while (iterator.isValid()) { - outputView.writeByte(columnByte); - BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), - outputView); - BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), - outputView); - iterator.next(); + } catch (Exception e) { + try { + // closing the output view deletes the underlying data + outputView.close(); + } catch (Exception closingException) { + LOG.warn("Could not close the checkpoint state output view. The " + + "written data might not be deleted.", closingException); } + + throw new Exception("Could not write the checkpoint data into the checkpoint " + + "state output view.", e); } + } finally { + discardState(); } - } catch (Exception e) { + + StateHandle<DataInputView> stateHandle; + try { - // closing the output view deletes the underlying data - outputView.close(); - } catch (Exception closingException) { - LOG.warn("Could not close the checkpoint state output view. The " + - "written data might not be deleted.", closingException); + stateHandle = outputView.closeAndGetHandle(); + } catch (Exception ioE) { + throw new Exception("Could not close the checkpoint state output view and " + + "obtain the state handle.", ioE); } - throw new Exception("Could not write the checkpoint data into the checkpoint " + - "state output view.", e); + long endTime = System.currentTimeMillis(); + LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime)); + return new FinalFullyAsyncSnapshot(stateHandle, checkpointId); + } + } + } + + @Override + public void discardState() throws Exception { + if (!discarded) { + final Snapshot snapshotToRelease = snapshot; + + synchronized (lock) { + if (discarded) { + return; + } else { + discarded = true; + snapshot = null; + } } - } finally { + synchronized (dbCleanupLock) { if (db != null) { - db.releaseSnapshot(snapshot); + db.releaseSnapshot(snapshotToRelease); } } - snapshot = null; - } - - StateHandle<DataInputView> stateHandle; - - try { - stateHandle = outputView.closeAndGetHandle(); - } catch (Exception ioE) { - throw new Exception("Could not close the checkpoint state output view and " + - "obtain the state handle.", ioE); } - - long endTime = System.currentTimeMillis(); - LOG.info("Fully asynchronous RocksDB materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - startTime)); - return new FinalFullyAsyncSnapshot(stateHandle, checkpointId); } - } /** http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 7227006..516cc1d 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -26,10 +26,13 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import static org.apache.flink.util.Preconditions.checkNotNull; + @Internal public final class ExceptionUtils { @@ -142,6 +145,53 @@ public final class ExceptionUtils { } /** + * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception} + * to a prior exception, or returns the new exception, if no prior exception exists. + * + * <pre>{@code + * + * public void closeAllThings() throws Exception { + * Exception ex = null; + * try { + * component.shutdown(); + * } catch (Exception e) { + * ex = firstOrSuppressed(e, ex); + * } + * try { + * anotherComponent.stop(); + * } catch (Exception e) { + * ex = firstOrSuppressed(e, ex); + * } + * try { + * lastComponent.shutdown(); + * } catch (Exception e) { + * ex = firstOrSuppressed(e, ex); + * } + * + * if (ex != null) { + * throw ex; + * } + * } + * }</pre> + * + * @param newException The newly occurred exception + * @param previous The previously occurred exception, possibly null. + * + * @return The new exception, if no previous exception exists, or the previous exception with the + * new exception in the list of suppressed exceptions. + */ + public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) { + checkNotNull(newException, "newException"); + + if (previous == null) { + return newException; + } else { + previous.addSuppressed(newException); + return previous; + } + } + + /** * Private constructor to prevent instantiation. */ private ExceptionUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 6f185bd..ca35417 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -237,21 +237,23 @@ public class PendingCheckpoint { executor.execute(new Runnable() { @Override public void run() { - try { - for (TaskState taskState: taskStates.values()) { + for (TaskState taskState: taskStates.values()) { + try { taskState.discard(userClassLoader); + } catch (Exception e) { + LOG.warn("Could not properly dispose the task state " + + "belonging to vertex {} of checkpoint {} and job {}.", + taskState.getJobVertexID(), checkpointId, jobId, e); } - } catch (Exception e) { - LOG.warn("Could not properly dispose the pending checkpoint " + - "{} of job {}.", checkpointId, jobId, e); } + + taskStates.clear(); } }); } } finally { discarded = true; - taskStates.clear(); notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); } http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java index c2fc8a4..db9c273 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java @@ -52,11 +52,6 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD exte } @Override - public void discardState() throws Exception { - throw new RuntimeException("This should never be called and probably points to a bug."); - } - - @Override public long getStateSize() throws Exception { throw new RuntimeException("This should never be called and probably points to a bug."); } http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java index fee1efe..6c77c16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java @@ -35,9 +35,4 @@ public abstract class AsynchronousStateHandle<T> implements StateHandle<T> { public final T getState(ClassLoader userCodeClassLoader) throws Exception { throw new UnsupportedOperationException("This must not be called. This is likely an internal bug."); } - - @Override - public final void discardState() throws Exception { - throw new UnsupportedOperationException("This must not be called. This is likely an internal bug."); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 9531974..d7204a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -642,7 +642,24 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> for (int i = 0; i < states.length; i++) { StreamOperator<?> operator = allOperators[i]; if (operator != null) { - StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); + StreamTaskState state; + try { + state = operator.snapshotOperatorState(checkpointId, timestamp); + } catch (Exception exception) { + for (int j = 0; j < i; j++) { + if (states[j] != null) { + try { + states[j].discardState(); + } catch (Exception discardException) { + LOG.warn("Could not discard " + j + "th operator state.", discardException); + } + } + } + + throw new Exception("Could not perform the checkpoint for " + i + + "th operator in chain.", exception); + } + if (state.getOperatorState() instanceof AsynchronousStateHandle) { hasAsyncStates = true; } @@ -876,7 +893,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // ------------------------------------------------------------------------ - private static class AsyncCheckpointThread extends Thread implements Closeable { + static class AsyncCheckpointThread extends Thread implements Closeable { private final StreamTask<?, ?> owner; @@ -900,29 +917,43 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> @Override public void run() { try { - for (StreamTaskState state : states) { + for (int i = 0; i < states.length; i++) { + StreamTaskState state = states[i]; + if (state != null) { - if (state.getFunctionState() instanceof AsynchronousStateHandle) { - AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState(); - state.setFunctionState(asyncState.materialize()); - } - if (state.getOperatorState() instanceof AsynchronousStateHandle) { - AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState(); - state.setOperatorState(asyncState.materialize()); - } - if (state.getKvStates() != null) { - Set<String> keys = state.getKvStates().keySet(); - HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates(); - for (String key: keys) { - if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) { - AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key); - kvStates.put(key, asyncHandle.materialize()); + try { + if (state.getFunctionState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState(); + state.setFunctionState(asyncState.materialize()); + } + if (state.getOperatorState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState(); + state.setOperatorState(asyncState.materialize()); + } + if (state.getKvStates() != null) { + Set<String> keys = state.getKvStates().keySet(); + HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates(); + for (String key : keys) { + if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) { + AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key); + kvStates.put(key, asyncHandle.materialize()); + } + } + } + } catch (Exception exception) { + for (int j = 0; j < states.length; j++) { + try { + states[j].discardState(); + } catch (Exception discardException) { + LOG.warn("Could not discard the " + j + "th operator state.", discardException); } } - } + throw new Exception("Could not materialize the " + i + "th operator state.", exception); + } } } + StreamTaskStateList allStates = new StreamTaskStateList(states); owner.lastCheckpointSize = allStates.getStateSize(); owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java index 925dd8c..2ffb489 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java @@ -102,12 +102,24 @@ public class StreamTaskState implements Serializable, Closeable { this.operatorState = null; this.functionState = null; this.kvStates = null; + + Exception discardException = null; if (operatorState != null) { - operatorState.discardState(); + try { + operatorState.discardState(); + } catch (Exception exception) { + discardException = new Exception("Could not discard operator state.", exception); + } } if (functionState != null) { - functionState.discardState(); + try { + functionState.discardState(); + } catch (Exception exception) { + Exception newException = new Exception("Could not discard function state.", exception); + + ExceptionUtils.firstOrSuppressed(newException, discardException); + } } if (kvStates != null) { while (kvStates.size() > 0) { @@ -115,7 +127,13 @@ public class StreamTaskState implements Serializable, Closeable { Iterator<KvStateSnapshot<?, ?, ?, ?, ?>> values = kvStates.values().iterator(); while (values.hasNext()) { KvStateSnapshot<?, ?, ?, ?, ?> s = values.next(); - s.discardState(); + try { + s.discardState(); + } catch (Exception exception) { + Exception newException = new Exception("Could not discard key value state.", exception); + + ExceptionUtils.firstOrSuppressed(newException, discardException); + } values.remove(); } } @@ -124,6 +142,10 @@ public class StreamTaskState implements Serializable, Closeable { } } } + + if(discardException != null) { + throw discardException; + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java index cfaeaad..c7c957f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java @@ -194,6 +194,11 @@ public class StreamTaskAsyncCheckpointTest { } @Override + public void discardState() throws Exception { + // noop + } + + @Override public long getStateSize() { return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index e8315c7..f2dad46 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -43,8 +43,10 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.AsynchronousStateHandle; import org.apache.flink.runtime.state.StateBackendFactory; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -53,6 +55,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; @@ -60,6 +63,7 @@ import org.apache.flink.util.SerializedValue; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -69,12 +73,17 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.net.URL; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; @@ -191,6 +200,73 @@ public class StreamTaskTest { assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } + /** + * Tests that all created StreamTaskStates are properly cleaned up when a snapshotting method + * of an operator fails. + */ + @Test + public void testStateCleanupWhenFailingCheckpoint() throws Exception { + final long checkpointId = 1L; + final long timestamp = 42L; + + StreamTask<Integer, StreamOperator<Integer>> streamTask = new TestingStreamTask(); + streamTask.setEnvironment(new DummyEnvironment("test task", 1, 0)); + + OperatorChain<Integer> operatorChain = mock(OperatorChain.class); + + StreamOperator<Integer> firstOperator = mock(StreamOperator.class); + StreamTaskState firstStreamTaskState = mock(StreamTaskState.class); + StreamOperator<Integer> secondOperator = mock(StreamOperator.class); + + doReturn(firstStreamTaskState).when(firstOperator).snapshotOperatorState(anyLong(), anyLong()); + doThrow(new Exception("Test Exception")).when(secondOperator).snapshotOperatorState(anyLong(), anyLong()); + + doReturn(new StreamOperator<?>[]{firstOperator, secondOperator}).when(operatorChain).getAllOperators(); + + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "isRunning", true); + + try { + streamTask.triggerCheckpoint(checkpointId, timestamp); + fail("Expected exception here."); + } catch (Exception expected) { + // expected failing trigger checkpoint here + } + + verify(firstStreamTaskState).discardState(); + } + + /** + * Tests that the AsyncCheckpointThread discards the given StreamTaskStates in case a failure + * occurs while materializing the asynchronous state handles. + */ + @Test + public void testAsyncCheckpointThreadStateCleanup() throws Exception { + final long checkpointId = 1L; + StreamTaskState firstState = mock(StreamTaskState.class); + StreamTaskState secondState = mock(StreamTaskState.class); + AsynchronousStateHandle<Integer> functionStateHandle = mock(AsynchronousStateHandle.class); + + doReturn(functionStateHandle).when(firstState).getFunctionState(); + doThrow(new Exception("Test exception")).when(functionStateHandle).materialize(); + + StreamTask<Integer, StreamOperator<Integer>> owner = mock(StreamTask.class); + StreamTaskState[] states = {firstState, secondState}; + + StreamTask.AsyncCheckpointThread asyncCheckpointThread = new StreamTask.AsyncCheckpointThread( + "AsyncCheckpointThread", + owner, + new HashSet<Closeable>(), + states, + checkpointId); + + asyncCheckpointThread.run(); + + for (StreamTaskState streamTaskState : states) { + verify(streamTaskState).discardState(); + } + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ @@ -489,4 +565,33 @@ public class StreamTaskTest { interrupt(); } } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + + /** + * Testing class for StreamTask methods + */ + private static class TestingStreamTask extends StreamTask<Integer, StreamOperator<Integer>> { + + @Override + protected void init() throws Exception { + + } + + @Override + protected void run() throws Exception { + + } + + @Override + protected void cleanup() throws Exception { + + } + + @Override + protected void cancelTask() throws Exception { + + } + } }
