Repository: flink
Updated Branches:
  refs/heads/release-1.1 388acbca9 -> 020da2ce1


[FLINK-5229] [tm] Discard completed checkpoints if a subsequent operator's 
checkpoint fails

In case of a failure of any StreamOperator#snapshotState method, all up to this 
point created
StreamTaskStates are discarded. This ensures that a failing checkpoint 
operation of a chained
operator won't leave orphaned checkpoint data behind.

This closes #2924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/020da2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/020da2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/020da2ce

Branch: refs/heads/release-1.1
Commit: 020da2ce1c8be83789252d0db959896a761d7513
Parents: 388acbc
Author: Till Rohrmann <[email protected]>
Authored: Fri Dec 2 15:33:06 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Dec 2 19:10:46 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 214 ++++++++++++-------
 .../org/apache/flink/util/ExceptionUtils.java   |  50 +++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  14 +-
 .../state/AsynchronousKvStateSnapshot.java      |   5 -
 .../runtime/state/AsynchronousStateHandle.java  |   5 -
 .../streaming/runtime/tasks/StreamTask.java     |  69 ++++--
 .../runtime/tasks/StreamTaskState.java          |  28 ++-
 .../tasks/StreamTaskAsyncCheckpointTest.java    |   5 +
 .../streaming/runtime/tasks/StreamTaskTest.java | 105 +++++++++
 9 files changed, 377 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1561afc..3d75bde 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -77,6 +77,7 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 
+import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -638,11 +639,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         */
        private static class SemiAsyncSnapshot extends 
AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> {
                private static final long serialVersionUID = 1L;
+
+               private final SerializableObject lock = new 
SerializableObject();
                private final File localBackupPath;
                private final URI backupUri;
                private final List<StateDescriptor> stateDescriptors;
                private final long checkpointId;
 
+               private volatile boolean discarded;
+
                private SemiAsyncSnapshot(File localBackupPath,
                                URI backupUri,
                                List<StateDescriptor> columnFamilies,
@@ -651,22 +656,45 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        this.backupUri = backupUri;
                        this.stateDescriptors = columnFamilies;
                        this.checkpointId = checkpointId;
+                       this.discarded = false;
                }
 
                @Override
                public KvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws 
Exception {
-                       try {
-                               long startTime = System.currentTimeMillis();
-                               
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
-                               long endTime = System.currentTimeMillis();
-                               LOG.info("RocksDB materialization from " + 
localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime 
- startTime) + " ms.");
-                               return new FinalSemiAsyncSnapshot(backupUri, 
checkpointId, stateDescriptors);
-                       } catch (Exception e) {
-                               FileSystem fs = FileSystem.get(backupUri, 
HadoopFileSystem.getHadoopConfiguration());
-                               fs.delete(new 
org.apache.hadoop.fs.Path(backupUri), true);
-                               throw e;
-                       } finally {
-                               FileUtils.deleteQuietly(localBackupPath);
+                       synchronized (lock) {
+                               if (discarded) {
+                                       throw new Exception("The 
SemiAsyncSnapshot has already been discarded.");
+                               } else {
+                                       try {
+                                               long startTime = 
System.currentTimeMillis();
+                                               
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+                                               long endTime = 
System.currentTimeMillis();
+
+                                               LOG.info("RocksDB 
materialization from {} to {} (asynchronous part) took {} ms.", 
localBackupPath, backupUri, (endTime - startTime));
+
+                                               return new 
FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
+                                       } catch (Exception e) {
+                                               FileSystem fs = 
FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+                                               fs.delete(new 
org.apache.hadoop.fs.Path(backupUri), true);
+                                               throw e;
+                                       } finally {
+                                               discardState();
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       if (!discarded) {
+                               synchronized (lock) {
+                                       if (!discarded) {
+                                               discarded = true;
+                                               if 
(!FileUtils.deleteQuietly(localBackupPath)) {
+                                                       LOG.warn("Could not 
delete the local backup file stored at {}.", localBackupPath);
+                                               }
+                                       }
+                               }
                        }
                }
        }
@@ -732,10 +760,13 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                private transient org.rocksdb.Snapshot snapshot;
                private transient AbstractStateBackend backend;
 
+               private final SerializableObject lock = new 
SerializableObject();
                private final URI backupUri;
                private final Map<String, Tuple2<ColumnFamilyHandle, 
StateDescriptor>> columnFamilies;
                private final long checkpointId;
 
+               private volatile boolean discarded;
+
                private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot,
                                AbstractStateBackend backend,
                                URI backupUri,
@@ -746,99 +777,122 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        this.backupUri = backupUri;
                        this.columnFamilies = columnFamilies;
                        this.checkpointId = checkpointId;
+                       this.discarded = false;
                }
 
                @Override
                public KvStateSnapshot<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws 
Exception {
-                       long startTime = System.currentTimeMillis();
-                       CheckpointStateOutputView outputView;
-
-                       try {
-                               try {
-                                       outputView = 
backend.createCheckpointStateOutputView(checkpointId, startTime);
-                               } catch (Exception e) {
-                                       throw new Exception("Could not create a 
checkpoint state output view to " +
-                                               "materialize the checkpoint 
data into.", e);
-                               }
-
-                               try {
-                                       
outputView.writeInt(columnFamilies.size());
+                       synchronized (lock) {
+                               if (discarded) {
+                                       throw new Exception("FullyAsyncSnapshot 
has already been discarded.");
+                               } else {
+                                       long startTime = 
System.currentTimeMillis();
+                                       CheckpointStateOutputView outputView;
 
-                                       // we don't know how many key/value 
pairs there are in each column family.
-                                       // We prefix every written element with 
a byte that signifies to which
-                                       // column family it belongs, this way 
we can restore the column families
-                                       byte count = 0;
-                                       Map<String, Byte> columnFamilyMapping = 
new HashMap<>();
-                                       for (Map.Entry<String, 
Tuple2<ColumnFamilyHandle, StateDescriptor>> column : 
columnFamilies.entrySet()) {
-                                               
columnFamilyMapping.put(column.getKey(), count);
+                                       try {
+                                               try {
+                                                       outputView = 
backend.createCheckpointStateOutputView(checkpointId, startTime);
+                                               } catch (Exception e) {
+                                                       throw new 
Exception("Could not create a checkpoint state output view to " +
+                                                               "materialize 
the checkpoint data into.", e);
+                                               }
 
-                                               outputView.writeByte(count);
+                                               try {
+                                                       
outputView.writeInt(columnFamilies.size());
 
-                                               ObjectOutputStream ooOut = new 
ObjectOutputStream(outputView);
-                                               
ooOut.writeObject(column.getValue().f1);
-                                               ooOut.flush();
+                                                       // we don't know how 
many key/value pairs there are in each column family.
+                                                       // We prefix every 
written element with a byte that signifies to which
+                                                       // column family it 
belongs, this way we can restore the column families
+                                                       byte count = 0;
+                                                       Map<String, Byte> 
columnFamilyMapping = new HashMap<>();
+                                                       for (Map.Entry<String, 
Tuple2<ColumnFamilyHandle, StateDescriptor>> column : 
columnFamilies.entrySet()) {
+                                                               
columnFamilyMapping.put(column.getKey(), count);
 
-                                               count++;
-                                       }
+                                                               
outputView.writeByte(count);
 
-                                       ReadOptions readOptions = new 
ReadOptions();
-                                       readOptions.setSnapshot(snapshot);
+                                                               
ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+                                                               
ooOut.writeObject(column.getValue().f1);
+                                                               ooOut.flush();
 
-                                       for (Map.Entry<String, 
Tuple2<ColumnFamilyHandle, StateDescriptor>> column : 
columnFamilies.entrySet()) {
-                                               byte columnByte = 
columnFamilyMapping.get(column.getKey());
+                                                               count++;
+                                                       }
 
-                                               synchronized (dbCleanupLock) {
-                                                       if (db == null) {
-                                                               throw new 
RuntimeException("RocksDB instance was disposed. This happens " +
-                                                                       "when 
we are in the middle of a checkpoint and the job fails.");
+                                                       ReadOptions readOptions 
= new ReadOptions();
+                                                       
readOptions.setSnapshot(snapshot);
+
+                                                       for (Map.Entry<String, 
Tuple2<ColumnFamilyHandle, StateDescriptor>> column : 
columnFamilies.entrySet()) {
+                                                               byte columnByte 
= columnFamilyMapping.get(column.getKey());
+
+                                                               synchronized 
(dbCleanupLock) {
+                                                                       if (db 
== null) {
+                                                                               
throw new RuntimeException("RocksDB instance was disposed. This happens " +
+                                                                               
        "when we are in the middle of a checkpoint and the job fails.");
+                                                                       }
+                                                                       
RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+                                                                       
iterator.seekToFirst();
+                                                                       while 
(iterator.isValid()) {
+                                                                               
outputView.writeByte(columnByte);
+                                                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
+                                                                               
        outputView);
+                                                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
+                                                                               
        outputView);
+                                                                               
iterator.next();
+                                                                       }
+                                                               }
                                                        }
-                                                       RocksIterator iterator 
= db.newIterator(column.getValue().f0, readOptions);
-                                                       iterator.seekToFirst();
-                                                       while 
(iterator.isValid()) {
-                                                               
outputView.writeByte(columnByte);
-                                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
-                                                                       
outputView);
-                                                               
BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
-                                                                       
outputView);
-                                                               iterator.next();
+                                               } catch (Exception e) {
+                                                       try {
+                                                               // closing the 
output view deletes the underlying data
+                                                               
outputView.close();
+                                                       } catch (Exception 
closingException) {
+                                                               LOG.warn("Could 
not close the checkpoint state output view. The " +
+                                                                       
"written data might not be deleted.", closingException);
                                                        }
+
+                                                       throw new 
Exception("Could not write the checkpoint data into the checkpoint " +
+                                                               "state output 
view.", e);
                                                }
+                                       } finally {
+                                               discardState();
                                        }
-                               } catch (Exception e) {
+
+                                       StateHandle<DataInputView> stateHandle;
+
                                        try {
-                                               // closing the output view 
deletes the underlying data
-                                               outputView.close();
-                                       } catch (Exception closingException) {
-                                               LOG.warn("Could not close the 
checkpoint state output view. The " +
-                                                       "written data might not 
be deleted.", closingException);
+                                               stateHandle = 
outputView.closeAndGetHandle();
+                                       } catch (Exception ioE) {
+                                               throw new Exception("Could not 
close the checkpoint state output view and " +
+                                                       "obtain the state 
handle.", ioE);
                                        }
 
-                                       throw new Exception("Could not write 
the checkpoint data into the checkpoint " +
-                                               "state output view.", e);
+                                       long endTime = 
System.currentTimeMillis();
+                                       LOG.info("Fully asynchronous RocksDB 
materialization to {} (asynchronous part) took {} ms.", backupUri, (endTime - 
startTime));
+                                       return new 
FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+                               }
+                       }
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       if (!discarded) {
+                               final Snapshot snapshotToRelease = snapshot;
+
+                               synchronized (lock) {
+                                       if (discarded) {
+                                               return;
+                                       } else {
+                                               discarded = true;
+                                               snapshot = null;
+                                       }
                                }
-                       } finally {
+
                                synchronized (dbCleanupLock) {
                                        if (db != null) {
-                                               db.releaseSnapshot(snapshot);
+                                               
db.releaseSnapshot(snapshotToRelease);
                                        }
                                }
-                               snapshot = null;
-                       }
-
-                       StateHandle<DataInputView> stateHandle;
-
-                       try {
-                               stateHandle = outputView.closeAndGetHandle();
-                       } catch (Exception ioE) {
-                               throw new Exception("Could not close the 
checkpoint state output view and " +
-                                       "obtain the state handle.", ioE);
                        }
-
-                       long endTime = System.currentTimeMillis();
-                       LOG.info("Fully asynchronous RocksDB materialization to 
{} (asynchronous part) took {} ms.", backupUri, (endTime - startTime));
-                       return new FinalFullyAsyncSnapshot(stateHandle, 
checkpointId);
                }
-
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7227006..516cc1d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class ExceptionUtils {
 
@@ -142,6 +145,53 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) 
suppressed exception}
+        * to a prior exception, or returns the new exception, if no prior 
exception exists.
+        *
+        * <pre>{@code
+        *
+        * public void closeAllThings() throws Exception {
+        *     Exception ex = null;
+        *     try {
+        *         component.shutdown();
+        *     } catch (Exception e) {
+        *         ex = firstOrSuppressed(e, ex);
+        *     }
+        *     try {
+        *         anotherComponent.stop();
+        *     } catch (Exception e) {
+        *         ex = firstOrSuppressed(e, ex);
+        *     }
+        *     try {
+        *         lastComponent.shutdown();
+        *     } catch (Exception e) {
+        *         ex = firstOrSuppressed(e, ex);
+        *     }
+        *
+        *     if (ex != null) {
+        *         throw ex;
+        *     }
+        * }
+        * }</pre>
+        *
+        * @param newException The newly occurred exception
+        * @param previous     The previously occurred exception, possibly null.
+        *
+        * @return The new exception, if no previous exception exists, or the 
previous exception with the
+        *         new exception in the list of suppressed exceptions.
+        */
+       public static <T extends Throwable> T firstOrSuppressed(T newException, 
@Nullable T previous) {
+               checkNotNull(newException, "newException");
+
+               if (previous == null) {
+                       return newException;
+               } else {
+                       previous.addSuppressed(newException);
+                       return previous;
+               }
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private ExceptionUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6f185bd..ca35417 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -237,21 +237,23 @@ public class PendingCheckpoint {
                                        executor.execute(new Runnable() {
                                                @Override
                                                public void run() {
-                                                       try {
-                                                               for (TaskState 
taskState: taskStates.values()) {
+                                                       for (TaskState 
taskState: taskStates.values()) {
+                                                               try {
                                                                        
taskState.discard(userClassLoader);
+                                                               } catch 
(Exception e) {
+                                                                       
LOG.warn("Could not properly dispose the task state " +
+                                                                               
"belonging to vertex {} of checkpoint {} and job {}.",
+                                                                               
taskState.getJobVertexID(), checkpointId, jobId, e);
                                                                }
-                                                       } catch (Exception e) {
-                                                               LOG.warn("Could 
not properly dispose the pending checkpoint " +
-                                                                       "{} of 
job {}.", checkpointId, jobId, e);
                                                        }
+
+                                                       taskStates.clear();
                                                }
                                        });
 
                                }
                        } finally {
                                discarded = true;
-                               taskStates.clear();
                                notYetAcknowledgedTasks.clear();
                                acknowledgedTasks.clear();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
index c2fc8a4..db9c273 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
@@ -52,11 +52,6 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S 
extends State, SD exte
        }
 
        @Override
-       public void discardState() throws Exception {
-               throw new RuntimeException("This should never be called and 
probably points to a bug.");
-       }
-
-       @Override
        public long getStateSize() throws Exception {
                throw new RuntimeException("This should never be called and 
probably points to a bug.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
index fee1efe..6c77c16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
@@ -35,9 +35,4 @@ public abstract class AsynchronousStateHandle<T> implements 
StateHandle<T> {
        public final T getState(ClassLoader userCodeClassLoader) throws 
Exception {
                throw new UnsupportedOperationException("This must not be 
called. This is likely an internal bug.");
        }
-
-       @Override
-       public final void discardState() throws Exception {
-               throw new UnsupportedOperationException("This must not be 
called. This is likely an internal bug.");
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9531974..d7204a9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -642,7 +642,24 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                for (int i = 0; i < states.length; i++) {
                                        StreamOperator<?> operator = 
allOperators[i];
                                        if (operator != null) {
-                                               StreamTaskState state = 
operator.snapshotOperatorState(checkpointId, timestamp);
+                                               StreamTaskState state;
+                                               try {
+                                                       state = 
operator.snapshotOperatorState(checkpointId, timestamp);
+                                               } catch (Exception exception) {
+                                                       for (int j = 0; j < i; 
j++) {
+                                                               if (states[j] 
!= null) {
+                                                                       try {
+                                                                               
states[j].discardState();
+                                                                       } catch 
(Exception discardException) {
+                                                                               
LOG.warn("Could not discard " + j + "th operator state.", discardException);
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       throw new 
Exception("Could not perform the checkpoint for " + i +
+                                                               "th operator in 
chain.", exception);
+                                               }
+
                                                if (state.getOperatorState() 
instanceof AsynchronousStateHandle) {
                                                        hasAsyncStates = true;
                                                }
@@ -876,7 +893,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
        // 
------------------------------------------------------------------------
        
-       private static class AsyncCheckpointThread extends Thread implements 
Closeable {
+       static class AsyncCheckpointThread extends Thread implements Closeable {
 
                private final StreamTask<?, ?> owner;
 
@@ -900,29 +917,43 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                @Override
                public void run() {
                        try {
-                               for (StreamTaskState state : states) {
+                               for (int i = 0; i < states.length; i++) {
+                                       StreamTaskState state = states[i];
+
                                        if (state != null) {
-                                               if (state.getFunctionState() 
instanceof AsynchronousStateHandle) {
-                                                       
AsynchronousStateHandle<Serializable> asyncState = 
(AsynchronousStateHandle<Serializable>) state.getFunctionState();
-                                                       
state.setFunctionState(asyncState.materialize());
-                                               }
-                                               if (state.getOperatorState() 
instanceof AsynchronousStateHandle) {
-                                                       
AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) 
state.getOperatorState();
-                                                       
state.setOperatorState(asyncState.materialize());
-                                               }
-                                               if (state.getKvStates() != 
null) {
-                                                       Set<String> keys = 
state.getKvStates().keySet();
-                                                       HashMap<String, 
KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
-                                                       for (String key: keys) {
-                                                               if 
(kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
-                                                                       
AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = 
(AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
-                                                                       
kvStates.put(key, asyncHandle.materialize());
+                                               try {
+                                                       if 
(state.getFunctionState() instanceof AsynchronousStateHandle) {
+                                                               
AsynchronousStateHandle<Serializable> asyncState = 
(AsynchronousStateHandle<Serializable>) state.getFunctionState();
+                                                               
state.setFunctionState(asyncState.materialize());
+                                                       }
+                                                       if 
(state.getOperatorState() instanceof AsynchronousStateHandle) {
+                                                               
AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) 
state.getOperatorState();
+                                                               
state.setOperatorState(asyncState.materialize());
+                                                       }
+                                                       if (state.getKvStates() 
!= null) {
+                                                               Set<String> 
keys = state.getKvStates().keySet();
+                                                               HashMap<String, 
KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
+                                                               for (String key 
: keys) {
+                                                                       if 
(kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
+                                                                               
AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = 
(AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
+                                                                               
kvStates.put(key, asyncHandle.materialize());
+                                                                       }
+                                                               }
+                                                       }
+                                               } catch (Exception exception) {
+                                                       for (int j = 0; j < 
states.length; j++) {
+                                                               try {
+                                                                       
states[j].discardState();
+                                                               } catch 
(Exception discardException) {
+                                                                       
LOG.warn("Could not discard the " + j + "th operator state.", discardException);
                                                                }
                                                        }
-                                               }
 
+                                                       throw new 
Exception("Could not materialize the " + i + "th operator state.", exception);
+                                               }
                                        }
                                }
+
                                StreamTaskStateList allStates = new 
StreamTaskStateList(states);
                                owner.lastCheckpointSize = 
allStates.getStateSize();
                                
owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 925dd8c..2ffb489 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -102,12 +102,24 @@ public class StreamTaskState implements Serializable, 
Closeable {
                this.operatorState = null;
                this.functionState = null;
                this.kvStates = null;
+
+               Exception discardException = null;
        
                if (operatorState != null) {
-                       operatorState.discardState();
+                       try {
+                               operatorState.discardState();
+                       } catch (Exception exception) {
+                               discardException = new Exception("Could not 
discard operator state.", exception);
+                       }
                }
                if (functionState != null) {
-                       functionState.discardState();
+                       try {
+                               functionState.discardState();
+                       } catch (Exception exception) {
+                               Exception newException = new Exception("Could 
not discard function state.", exception);
+
+                               ExceptionUtils.firstOrSuppressed(newException, 
discardException);
+                       }
                }
                if (kvStates != null) {
                        while (kvStates.size() > 0) {
@@ -115,7 +127,13 @@ public class StreamTaskState implements Serializable, 
Closeable {
                                        Iterator<KvStateSnapshot<?, ?, ?, ?, 
?>> values = kvStates.values().iterator();
                                        while (values.hasNext()) {
                                                KvStateSnapshot<?, ?, ?, ?, ?> 
s = values.next();
-                                               s.discardState();
+                                               try {
+                                                       s.discardState();
+                                               } catch (Exception exception) {
+                                                       Exception newException 
= new Exception("Could not discard key value state.", exception);
+
+                                                       
ExceptionUtils.firstOrSuppressed(newException, discardException);
+                                               }
                                                values.remove();
                                        }
                                }
@@ -124,6 +142,10 @@ public class StreamTaskState implements Serializable, 
Closeable {
                                }
                        }
                }
+
+               if(discardException != null) {
+                       throw discardException;
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index cfaeaad..c7c957f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -194,6 +194,11 @@ public class StreamTaskAsyncCheckpointTest {
                }
 
                @Override
+               public void discardState() throws Exception {
+                       // noop
+               }
+
+               @Override
                public long getStateSize() {
                        return 0;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/020da2ce/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e8315c7..f2dad46 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -43,8 +43,10 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -53,6 +55,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
@@ -60,6 +63,7 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import org.mockito.internal.util.reflection.Whitebox;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -69,12 +73,17 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import static org.junit.Assert.assertEquals;
@@ -191,6 +200,73 @@ public class StreamTaskTest {
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
        }
 
+       /**
+        * Tests that all created StreamTaskStates are properly cleaned up when 
a snapshotting method
+        * of an operator fails.
+        */
+       @Test
+       public void testStateCleanupWhenFailingCheckpoint() throws Exception {
+               final long checkpointId = 1L;
+               final long timestamp = 42L;
+
+               StreamTask<Integer, StreamOperator<Integer>> streamTask = new 
TestingStreamTask();
+               streamTask.setEnvironment(new DummyEnvironment("test task", 1, 
0));
+
+               OperatorChain<Integer> operatorChain = 
mock(OperatorChain.class);
+
+               StreamOperator<Integer> firstOperator = 
mock(StreamOperator.class);
+               StreamTaskState firstStreamTaskState = 
mock(StreamTaskState.class);
+               StreamOperator<Integer> secondOperator = 
mock(StreamOperator.class);
+
+               
doReturn(firstStreamTaskState).when(firstOperator).snapshotOperatorState(anyLong(),
 anyLong());
+               doThrow(new Exception("Test 
Exception")).when(secondOperator).snapshotOperatorState(anyLong(), anyLong());
+
+               doReturn(new StreamOperator<?>[]{firstOperator, 
secondOperator}).when(operatorChain).getAllOperators();
+
+               Whitebox.setInternalState(streamTask, "operatorChain", 
operatorChain);
+               Whitebox.setInternalState(streamTask, "isRunning", true);
+
+               try {
+                       streamTask.triggerCheckpoint(checkpointId, timestamp);
+                       fail("Expected exception here.");
+               } catch (Exception expected) {
+                       // expected failing trigger checkpoint here
+               }
+
+               verify(firstStreamTaskState).discardState();
+       }
+
+       /**
+        * Tests that the AsyncCheckpointThread discards the given 
StreamTaskStates in case a failure
+        * occurs while materializing the asynchronous state handles.
+        */
+       @Test
+       public void testAsyncCheckpointThreadStateCleanup() throws Exception {
+               final long checkpointId = 1L;
+               StreamTaskState firstState = mock(StreamTaskState.class);
+               StreamTaskState secondState = mock(StreamTaskState.class);
+               AsynchronousStateHandle<Integer> functionStateHandle = 
mock(AsynchronousStateHandle.class);
+
+               
doReturn(functionStateHandle).when(firstState).getFunctionState();
+               doThrow(new Exception("Test 
exception")).when(functionStateHandle).materialize();
+
+               StreamTask<Integer, StreamOperator<Integer>> owner = 
mock(StreamTask.class);
+               StreamTaskState[] states = {firstState, secondState};
+
+               StreamTask.AsyncCheckpointThread asyncCheckpointThread = new 
StreamTask.AsyncCheckpointThread(
+                       "AsyncCheckpointThread",
+                       owner,
+                       new HashSet<Closeable>(),
+                       states,
+                       checkpointId);
+
+               asyncCheckpointThread.run();
+
+               for (StreamTaskState streamTaskState : states) {
+                       verify(streamTaskState).discardState();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Utilities
        // 
------------------------------------------------------------------------
@@ -489,4 +565,33 @@ public class StreamTaskTest {
                        interrupt();
                }
        }
+
+       // 
------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Testing class for StreamTask methods
+        */
+       private static class TestingStreamTask extends StreamTask<Integer, 
StreamOperator<Integer>> {
+
+               @Override
+               protected void init() throws Exception {
+
+               }
+
+               @Override
+               protected void run() throws Exception {
+
+               }
+
+               @Override
+               protected void cleanup() throws Exception {
+
+               }
+
+               @Override
+               protected void cancelTask() throws Exception {
+
+               }
+       }
 }

Reply via email to