Repository: flink
Updated Branches:
  refs/heads/release-1.2 99fb80be7 -> a9e74b59a


[FLINK-5985] Report no task states for stateless tasks on checkpointing


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9e74b59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9e74b59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9e74b59

Branch: refs/heads/release-1.2
Commit: a9e74b59adabc5ad707bcd8ed0f7d5e4b75be939
Parents: 99fb80b
Author: Stefan Richter <[email protected]>
Authored: Tue Mar 14 16:37:02 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Mar 20 12:19:35 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   9 +-
 .../state/DefaultOperatorStateBackend.java      |   2 +-
 .../apache/flink/runtime/state/DoneFuture.java  |  16 +-
 .../state/ManagedInitializationContext.java     |   3 +-
 .../flink/runtime/state/Snapshotable.java       |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |  34 ++++-
 .../runtime/state/OperatorStateBackendTest.java |  19 ++-
 .../runtime/state/StateBackendTestBase.java     |  33 ++--
 .../flink/runtime/state/StateUtilTest.java      |   2 +-
 .../api/operators/OperatorSnapshotResult.java   |  12 ++
 .../streaming/runtime/tasks/StreamTask.java     |  35 ++++-
 .../streaming/runtime/tasks/StreamTaskTest.java |  74 ++++++++-
 .../test/checkpointing/SavepointITCase.java     | 150 +++++++++++++++++++
 13 files changed, 354 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ce010b5..cde297e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -232,6 +232,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return keyGroupPrefixBytes;
        }
 
+       private boolean hasRegisteredState() {
+               return !kvStateInformation.isEmpty();
+       }
+
        /**
         * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can be canceled and
         * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always
@@ -257,13 +261,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        if (db != null) {
 
-                               if (kvStateInformation.isEmpty()) {
+                               if (!hasRegisteredState()) {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at " + timestamp +
                                                                " . Returning 
null.");
                                        }
-
-                                       return new DoneFuture<>(null);
+                                       return DoneFuture.nullValue();
                                }
 
                                snapshotOperation.takeDBSnapShot(checkpointId, 
timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1cd1da7..b4a80a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -157,7 +157,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception {
 
                if (registeredStates.isEmpty()) {
-                       return new DoneFuture<>(null);
+                       return DoneFuture.nullValue();
                }
 
                List<OperatorBackendSerializationProxy.StateMetaInfo<?>> 
metaInfoList =

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
index 777ab69..d2d808d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
@@ -30,10 +30,13 @@ import java.util.concurrent.TimeoutException;
  * @param <T> The type of object in this {@code Future}.
  */
 public class DoneFuture<T> implements RunnableFuture<T> {
-       private final T keyGroupsStateHandle;
 
-       public DoneFuture(T keyGroupsStateHandle) {
-               this.keyGroupsStateHandle = keyGroupsStateHandle;
+       private static final DoneFuture<?> NULL_FUTURE = new 
DoneFuture<Object>(null);
+
+       private final T payload;
+
+       public DoneFuture(T payload) {
+               this.payload = payload;
        }
 
        @Override
@@ -53,7 +56,7 @@ public class DoneFuture<T> implements RunnableFuture<T> {
 
        @Override
        public T get() throws InterruptedException, ExecutionException {
-               return keyGroupsStateHandle;
+               return payload;
        }
 
        @Override
@@ -67,4 +70,9 @@ public class DoneFuture<T> implements RunnableFuture<T> {
        public void run() {
 
        }
+
+       @SuppressWarnings("unchecked")
+       public static <T> DoneFuture<T> nullValue() {
+               return (DoneFuture<T>) NULL_FUTURE;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index 5255c43..522aca6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -36,7 +36,8 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 public interface ManagedInitializationContext {
 
        /**
-        * Returns true, if some managed state was restored from the snapshot 
of a previous execution.
+        * Returns true, if state was restored from the snapshot of a previous 
execution. This returns always false for
+        * stateless tasks.
         */
        boolean isRestored();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..6bb8af7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * Interface for operations that can perform snapshots of their state.
+ * Interface for operators that can perform snapshots of their state.
  *
  * @param <S> Generic type of the state object that is created as handle to 
snapshots.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index cee7dd5..82d5242 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,6 +48,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 public class PendingCheckpointTest {
 
@@ -58,7 +59,10 @@ public class PendingCheckpointTest {
        private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
 
        static {
-               ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
+               ExecutionVertex vertex = mock(ExecutionVertex.class);
+               when(vertex.getMaxParallelism()).thenReturn(128);
+               when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
+               ACK_TASKS.put(ATTEMPT_ID, vertex);
        }
 
        /**
@@ -280,6 +284,32 @@ public class PendingCheckpointTest {
                }
        }
 
+       /**
+        * FLINK-5985
+        * <p>
+        * Ensures that subtasks that acknowledge their state as 'null' are 
considered stateless. This means that they
+        * should not appear in the task states map of the checkpoint.
+        */
+       @Test
+       public void testNullSubtaskStateLeadsToStatelessTask() throws Exception 
{
+               PendingCheckpoint pending = 
createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+               pending.acknowledgeTask(ATTEMPT_ID, null, 
mock(CheckpointMetaData.class));
+               Assert.assertTrue(pending.getTaskStates().isEmpty());
+       }
+
+       /**
+        * FLINK-5985
+        * <p>
+        * This tests checks the inverse of {@link 
#testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
+        * for subtasks that acknowledge some state are given an entry in the 
task states of the checkpoint.
+        */
+       @Test
+       public void testNonNullSubtaskStateLeadsToStatefulTask() throws 
Exception {
+               PendingCheckpoint pending = 
createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
+               pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), 
mock(CheckpointMetaData.class));
+               Assert.assertFalse(pending.getTaskStates().isEmpty());
+       }
+
        @Test
        public void testSetCanceller() {
                final CheckpointProperties props = new 
CheckpointProperties(false, false, true, true, true, true, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..9caec45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -22,11 +22,14 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.FutureUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -143,6 +146,19 @@ public class OperatorStateBackendTest {
        }
 
        @Test
+       public void testSnapshotEmpty() throws Exception {
+               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               CheckpointStreamFactory streamFactory =
+                               abstractStateBackend.createStreamFactory(new 
JobID(), "testOperator");
+
+               RunnableFuture<OperatorStateHandle> snapshot =
+                               operatorStateBackend.snapshot(0L, 0L, 
streamFactory);
+
+               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+               Assert.assertNull(stateHandle);
+       }
+
+       @Test
        public void testSnapshotRestore() throws Exception {
                DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
@@ -165,7 +181,8 @@ public class OperatorStateBackendTest {
                listState3.add(20);
 
                CheckpointStreamFactory streamFactory = 
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
-               OperatorStateHandle stateHandle = 
operatorStateBackend.snapshot(1, 1, streamFactory).get();
+               OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(
+                               operatorStateBackend.snapshot(1, 1, 
streamFactory));
 
                try {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 148c809..e821bcf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.types.IntValue;
+import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -184,7 +185,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals("1", getSerializedValue(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                // make some more modifications
                backend.setCurrentKey(1);
@@ -195,7 +196,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("u3");
 
                // draw another snapshot
-               KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+               KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, 
streamFactory));
 
                // validate the original state
                backend.setCurrentKey(1);
@@ -396,7 +397,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(13, (int) state2.value());
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                backend.dispose();
                backend = restoreKeyedBackend(
@@ -469,7 +470,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(42L, (long) state.value());
 
                // draw a snapshot
-               KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+               KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                backend.dispose();
                backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
@@ -514,7 +515,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("1", 
joiner.join(getSerializedList(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -525,7 +526,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("u3");
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, 
streamFactory));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -613,7 +614,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("1", getSerializedValue(kvState, 1, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -624,7 +625,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("u3");
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, 
streamFactory));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -715,7 +716,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        assertEquals("Fold-Initial:,1", 
getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        // make some more modifications
                        backend.setCurrentKey(1);
@@ -727,7 +728,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add(103);
 
                        // draw another snapshot
-                       KeyGroupsStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+                       KeyGroupsStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, 
streamFactory));
 
                        // validate the original state
                        backend.setCurrentKey(1);
@@ -966,7 +967,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("ShouldBeInSecondHalf");
 
 
-               KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 
0, streamFactory));
+               KeyGroupsStateHandle snapshot = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory));
 
                List<KeyGroupsStateHandle> firstHalfKeyGroupStates = 
StateAssignmentOperation.getKeyGroupsStateHandles(
                                Collections.singletonList(snapshot),
@@ -1033,7 +1034,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.update("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1084,7 +1085,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1137,7 +1138,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+                       KeyGroupsStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, 
streamFactory));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -1387,7 +1388,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                eq(env.getJobID()), eq(env.getJobVertexId()), 
eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-               KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+               KeyGroupsStateHandle snapshot = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, 
streamFactory));
 
                backend.dispose();
 
@@ -1418,7 +1419,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
 
                        // draw a snapshot
-                       KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+                       KeyGroupsStateHandle snapshot = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, 
streamFactory));
                        assertNull(snapshot);
                        backend.dispose();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
index e59d027..d6966d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java
@@ -30,7 +30,7 @@ public class StateUtilTest extends TestLogger {
         */
        @Test
        public void testDiscardRunnableFutureWithNullValue() throws Exception {
-               RunnableFuture<StateHandle<?>> stateFuture = new 
DoneFuture<>(null);
+               RunnableFuture<StateHandle<?>> stateFuture = 
DoneFuture.nullValue();
                StateUtil.discardStateFuture(stateFuture);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..f3dc2b1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -121,4 +121,16 @@ public class OperatorSnapshotResult {
                        throw exception;
                }
        }
+
+       public boolean hasKeyedState() {
+               return keyedStateManagedFuture != null || keyedStateRawFuture 
!= null;
+       }
+
+       public boolean hasOperatorState() {
+               return operatorStateManagedFuture != null || 
operatorStateRawFuture != null;
+       }
+
+       public boolean hasState() {
+               return hasKeyedState() || hasOperatorState();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1e05f19..d79edb4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -933,14 +933,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateStream =
                                                new 
ChainedStateHandle<>(operatorStatesStream);
 
-                               SubtaskState subtaskState = new SubtaskState(
+                               SubtaskState subtaskState = 
createSubtaskStateFromSnapshotStateHandles(
                                                
chainedNonPartitionedOperatorsState,
                                                chainedOperatorStateBackend,
                                                chainedOperatorStateStream,
                                                keyedStateHandleBackend,
                                                keyedStateHandleStream);
 
-                               if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+                               if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
+                                               
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+
                                        
owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
 
                                        if (LOG.isDebugEnabled()) {
@@ -958,7 +960,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                asyncCheckpointState.compareAndSet(
                                        
CheckpointingOperation.AsynCheckpointState.COMPLETED,
                                        
CheckpointingOperation.AsynCheckpointState.RUNNING);
-                               
+
                                try {
                                        cleanup();
                                } catch (Exception cleanupException) {
@@ -987,6 +989,31 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
 
+               private SubtaskState createSubtaskStateFromSnapshotStateHandles(
+                               ChainedStateHandle<StreamStateHandle> 
chainedNonPartitionedOperatorsState,
+                               ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateBackend,
+                               ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateStream,
+                               KeyGroupsStateHandle keyedStateHandleBackend,
+                               KeyGroupsStateHandle keyedStateHandleStream) {
+
+                       boolean hasAnyState = keyedStateHandleBackend != null
+                                       || keyedStateHandleStream != null
+                                       || 
!chainedOperatorStateBackend.isEmpty()
+                                       || !chainedOperatorStateStream.isEmpty()
+                                       || 
!chainedNonPartitionedOperatorsState.isEmpty();
+
+                       // we signal a stateless task by reporting null, so 
that there are no attempts to assign empty state to
+                       // stateless tasks on restore. This allows for simple 
job modifications that only concern stateless without
+                       // the need to assign them uids to match their (always 
empty) states.
+                       return hasAnyState ? new SubtaskState(
+                                       chainedNonPartitionedOperatorsState,
+                                       chainedOperatorStateBackend,
+                                       chainedOperatorStateStream,
+                                       keyedStateHandleBackend,
+                                       keyedStateHandleStream)
+                                       : null;
+               }
+
                private void cleanup() throws Exception {
                        if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
 CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
                                LOG.debug("Cleanup AsyncCheckpointRunnable for 
checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
@@ -1122,7 +1149,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                @SuppressWarnings("deprecation")
                private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
                        if (null != op) {
-                               // first call the legacy checkpoint code paths 
+                               // first call the legacy checkpoint code paths
                                
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
                                                
checkpointMetaData.getCheckpointId(),
                                                
checkpointMetaData.getTimestamp()));

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6075dc5..4dc3095 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import akka.dispatch.Futures;
-
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -84,10 +83,9 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
-
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -107,8 +105,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -636,6 +636,74 @@ public class StreamTaskTest extends TestLogger {
                verify(rawOperatorStateHandle).discardState();
        }
 
+       /**
+        * FLINK-5985
+        *
+        * This test ensures that empty snapshots (no op/keyed stated 
whatsoever) will be reported as stateless tasks. This
+        * happens by translating an empty {@link SubtaskState} into reporting 
'null' to #acknowledgeCheckpoint.
+        */
+       @Test
+       public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() 
throws Exception {
+               final long checkpointId = 42L;
+               final long timestamp = 1L;
+
+               TaskInfo mockTaskInfo = mock(TaskInfo.class);
+
+               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+               Environment mockEnvironment = mock(Environment.class);
+
+               // latch blocks until the async checkpoint thread acknowledges
+               final OneShotLatch checkpointCompletedLatch = new 
OneShotLatch();
+               final List<SubtaskState> checkpointResult = new ArrayList<>(1);
+
+               // we remember what is acknowledged (expected to be null as our 
task will snapshot empty states).
+               doAnswer(new Answer() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               SubtaskState subtaskState = 
invocationOnMock.getArgumentAt(1, SubtaskState.class);
+                               checkpointResult.add(subtaskState);
+                               checkpointCompletedLatch.trigger();
+                               return null;
+                       }
+               
}).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), 
any(SubtaskState.class));
+
+               when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+               StreamTask<?, AbstractStreamOperator<?>> streamTask = 
mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
+               streamTask.setEnvironment(mockEnvironment);
+
+               // mock the operators
+               StreamOperator<?> statelessOperator =
+                               mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+               // mock the returned empty snapshot result (all state handles 
are null)
+               OperatorSnapshotResult statelessOperatorSnapshotResult = new 
OperatorSnapshotResult();
+               when(statelessOperator.snapshotState(anyLong(), anyLong()))
+                               .thenReturn(statelessOperatorSnapshotResult);
+
+               // set up the task
+               StreamOperator<?>[] streamOperators = {statelessOperator};
+               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
+               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+               Whitebox.setInternalState(streamTask, "isRunning", true);
+               Whitebox.setInternalState(streamTask, "lock", new Object());
+               Whitebox.setInternalState(streamTask, "operatorChain", 
operatorChain);
+               Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
+               Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
+               Whitebox.setInternalState(streamTask, 
"asyncOperationsThreadPool", Executors.newCachedThreadPool());
+
+               streamTask.triggerCheckpoint(checkpointMetaData);
+               checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
+               streamTask.cancel();
+
+               // ensure that 'null' was acknowledged as subtask state
+               Assert.assertNull(checkpointResult.get(0));
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 77777d1..72b747a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -25,6 +25,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -476,6 +477,149 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
+       /**
+        * FLINK-5985
+        *
+        * This test ensures we can restore from a savepoint under 
modifications to the job graph that only concern
+        * stateless operators.
+        */
+       @Test
+       public void testCanRestoreWithModifiedStatelessOperators() throws 
Exception {
+
+               // Config
+               int numTaskManagers = 2;
+               int numSlotsPerTaskManager = 4;
+               int parallelism = 2;
+
+               // Test deadline
+               final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
+
+               final File tmpDir = CommonTestUtils.createTempDirectory();
+               final File savepointDir = new File(tmpDir, "savepoints");
+
+               TestingCluster flink = null;
+               String savepointPath;
+               try {
+                       // Flink configuration
+                       final Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+                                       savepointDir.toURI().toString());
+
+                       LOG.info("Flink configuration: " + config + ".");
+
+                       // Start Flink
+                       flink = new TestingCluster(config);
+                       LOG.info("Starting Flink cluster.");
+                       flink.start(true);
+
+                       // Retrieve the job manager
+                       LOG.info("Retrieving JobManager.");
+                       ActorGateway jobManager = Await.result(
+                                       flink.leaderGateway().future(),
+                                       deadline.timeLeft());
+                       LOG.info("JobManager: " + jobManager + ".");
+
+                       final StatefulCounter statefulCounter = new 
StatefulCounter();
+                       StatefulCounter.resetForTest();
+
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setParallelism(parallelism);
+                       env.addSource(new InfiniteTestSource())
+                                       .shuffle()
+                                       .map(new MapFunction<Integer, 
Integer>() {
+
+                                               @Override
+                                               public Integer map(Integer 
value) throws Exception {
+                                                       return 4 * value;
+                                               }
+                                       })
+                                       .shuffle()
+                                       
.map(statefulCounter).uid("statefulCounter")
+                                       .shuffle()
+                                       .map(new MapFunction<Integer, 
Integer>() {
+
+                                               @Override
+                                               public Integer map(Integer 
value) throws Exception {
+                                                       return 2 * value;
+                                               }
+                                       })
+                                       .addSink(new DiscardingSink<Integer>());
+
+                       JobGraph originalJobGraph = 
env.getStreamGraph().getJobGraph();
+
+                       JobSubmissionResult submissionResult = 
flink.submitJobDetached(originalJobGraph);
+                       JobID jobID = submissionResult.getJobID();
+
+                       LOG.info("Waiting for some progress.");
+                       StatefulCounter.progressLatch.await(60, 
TimeUnit.SECONDS);
+
+                       Future<Object> savepointPathFuture = jobManager.ask(new 
TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
+                       savepointPath = ((TriggerSavepointSuccess) 
Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+                       Future<Object> savepointFuture = jobManager.ask(new 
RequestSavepoint(savepointPath), deadline.timeLeft());
+
+                       ((ResponseSavepoint) Await.result(savepointFuture, 
deadline.timeLeft())).savepoint();
+                       LOG.info("Retrieved savepoint: " + savepointPath + ".");
+
+                       // Shut down the Flink cluster (thereby canceling the 
job)
+                       LOG.info("Shutting down Flink cluster.");
+                       flink.shutdown();
+                       flink.awaitTermination();
+
+               } finally {
+                       flink.shutdown();
+                       flink.awaitTermination();
+               }
+
+               try {
+                       LOG.info("Restarting Flink cluster.");
+                       flink.start(true);
+
+                       // Retrieve the job manager
+                       LOG.info("Retrieving JobManager.");
+                       ActorGateway jobManager = 
Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+                       LOG.info("JobManager: " + jobManager + ".");
+
+                       // Reset static test helpers
+                       StatefulCounter.resetForTest();
+
+                       // Gather all task deployment descriptors
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setParallelism(parallelism);
+
+                       // generate a modified job graph that adds a stateless 
op
+                       env.addSource(new InfiniteTestSource())
+                                       .shuffle()
+                                       .map(new 
StatefulCounter()).uid("statefulCounter")
+                                       .shuffle()
+                                       .map(new MapFunction<Integer, 
Integer>() {
+
+                                               @Override
+                                               public Integer map(Integer 
value) throws Exception {
+                                                       return value;
+                                               }
+                                       })
+                                       .addSink(new DiscardingSink<Integer>());
+
+                       JobGraph modifiedJobGraph = 
env.getStreamGraph().getJobGraph();
+
+                       // Set the savepoint path
+                       
modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+                       LOG.info("Resubmitting job " + 
modifiedJobGraph.getJobID() + " with " +
+                                       "savepoint path " + savepointPath + " 
in detached mode.");
+
+                       // Submit the job
+                       flink.submitJobDetached(modifiedJobGraph);
+                       // Await state is restored
+                       StatefulCounter.progressLatch.await(60, 
TimeUnit.SECONDS);
+               } finally {
+                       flink.shutdown();
+                       flink.awaitTermination();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Test program
        // 
------------------------------------------------------------------------
@@ -529,10 +673,12 @@ public class SavepointITCase extends TestLogger {
                implements ListCheckpointed<byte[]>, CheckpointListener {
 
                private static final Object checkpointLock = new Object();
+               private static volatile OneShotLatch progressLatch = new 
OneShotLatch();
                private static int numCompleteCalls;
                private static int numRestoreCalls;
                private static boolean restoredFromCheckpoint;
 
+
                private static final long serialVersionUID = 
7317800376639115920L;
                private byte[] data;
 
@@ -551,6 +697,9 @@ public class SavepointITCase extends TestLogger {
                        for (int i = 0; i < data.length; i++) {
                                data[i] += 1;
                        }
+
+                       StatefulCounter.progressLatch.trigger();
+
                        return value;
                }
 
@@ -589,6 +738,7 @@ public class SavepointITCase extends TestLogger {
                                numCompleteCalls = 0;
                                numRestoreCalls = 0;
                                restoredFromCheckpoint = false;
+                               progressLatch = new OneShotLatch();
                        }
                }
 

Reply via email to