Repository: flink Updated Branches: refs/heads/release-1.2 99fb80be7 -> a9e74b59a
[FLINK-5985] Report no task states for stateless tasks on checkpointing Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9e74b59 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9e74b59 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9e74b59 Branch: refs/heads/release-1.2 Commit: a9e74b59adabc5ad707bcd8ed0f7d5e4b75be939 Parents: 99fb80b Author: Stefan Richter <[email protected]> Authored: Tue Mar 14 16:37:02 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Mar 20 12:19:35 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 9 +- .../state/DefaultOperatorStateBackend.java | 2 +- .../apache/flink/runtime/state/DoneFuture.java | 16 +- .../state/ManagedInitializationContext.java | 3 +- .../flink/runtime/state/Snapshotable.java | 2 +- .../checkpoint/PendingCheckpointTest.java | 34 ++++- .../runtime/state/OperatorStateBackendTest.java | 19 ++- .../runtime/state/StateBackendTestBase.java | 33 ++-- .../flink/runtime/state/StateUtilTest.java | 2 +- .../api/operators/OperatorSnapshotResult.java | 12 ++ .../streaming/runtime/tasks/StreamTask.java | 35 ++++- .../streaming/runtime/tasks/StreamTaskTest.java | 74 ++++++++- .../test/checkpointing/SavepointITCase.java | 150 +++++++++++++++++++ 13 files changed, 354 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index ce010b5..cde297e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -232,6 +232,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return keyGroupPrefixBytes; } + private boolean hasRegisteredState() { + return !kvStateInformation.isEmpty(); + } + /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always @@ -257,13 +261,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (db != null) { - if (kvStateInformation.isEmpty()) { + if (!hasRegisteredState()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); } - - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } snapshotOperation.takeDBSnapShot(checkpointId, timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 1cd1da7..b4a80a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -157,7 +157,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception { if (registeredStates.isEmpty()) { - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java index 777ab69..d2d808d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java @@ -30,10 +30,13 @@ import java.util.concurrent.TimeoutException; * @param <T> The type of object in this {@code Future}. */ public class DoneFuture<T> implements RunnableFuture<T> { - private final T keyGroupsStateHandle; - public DoneFuture(T keyGroupsStateHandle) { - this.keyGroupsStateHandle = keyGroupsStateHandle; + private static final DoneFuture<?> NULL_FUTURE = new DoneFuture<Object>(null); + + private final T payload; + + public DoneFuture(T payload) { + this.payload = payload; } @Override @@ -53,7 +56,7 @@ public class DoneFuture<T> implements RunnableFuture<T> { @Override public T get() throws InterruptedException, ExecutionException { - return keyGroupsStateHandle; + return payload; } @Override @@ -67,4 +70,9 @@ public class DoneFuture<T> implements RunnableFuture<T> { public void run() { } + + @SuppressWarnings("unchecked") + public static <T> DoneFuture<T> nullValue() { + return (DoneFuture<T>) NULL_FUTURE; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index 5255c43..522aca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -36,7 +36,8 @@ import org.apache.flink.api.common.state.OperatorStateStore; public interface ManagedInitializationContext { /** - * Returns true, if some managed state was restored from the snapshot of a previous execution. + * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for + * stateless tasks. */ boolean isRestored(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java index a4a6bc4..6bb8af7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java @@ -22,7 +22,7 @@ import java.util.Collection; import java.util.concurrent.RunnableFuture; /** - * Interface for operations that can perform snapshots of their state. + * Interface for operators that can perform snapshots of their state. * * @param <S> Generic type of the state object that is created as handle to snapshots. */ http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index cee7dd5..82d5242 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; - +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -48,6 +48,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; public class PendingCheckpointTest { @@ -58,7 +59,10 @@ public class PendingCheckpointTest { private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); static { - ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getMaxParallelism()).thenReturn(128); + when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1); + ACK_TASKS.put(ATTEMPT_ID, vertex); } /** @@ -280,6 +284,32 @@ public class PendingCheckpointTest { } } + /** + * FLINK-5985 + * <p> + * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they + * should not appear in the task states map of the checkpoint. + */ + @Test + public void testNullSubtaskStateLeadsToStatelessTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetaData.class)); + Assert.assertTrue(pending.getTaskStates().isEmpty()); + } + + /** + * FLINK-5985 + * <p> + * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that + * for subtasks that acknowledge some state are given an entry in the task states of the checkpoint. + */ + @Test + public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetaData.class)); + Assert.assertFalse(pending.getTaskStates().isEmpty()); + } + @Test public void testSetCanceller() { final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 5bd085f..9caec45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -22,11 +22,14 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.FutureUtil; +import org.junit.Assert; import org.junit.Test; import java.io.Serializable; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -143,6 +146,19 @@ public class OperatorStateBackendTest { } @Test + public void testSnapshotEmpty() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + CheckpointStreamFactory streamFactory = + abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); + + RunnableFuture<OperatorStateHandle> snapshot = + operatorStateBackend.snapshot(0L, 0L, streamFactory); + + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + Assert.assertNull(stateHandle); + } + + @Test public void testSnapshotRestore() throws Exception { DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); @@ -165,7 +181,8 @@ public class OperatorStateBackendTest { listState3.add(20); CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); - OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get(); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet( + operatorStateBackend.snapshot(1, 1, streamFactory)); try { http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 148c809..e821bcf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.heap.AbstractHeapState; import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.types.IntValue; +import org.apache.flink.util.FutureUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -184,7 +185,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); // make some more modifications backend.setCurrentKey(1); @@ -195,7 +196,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory)); // validate the original state backend.setCurrentKey(1); @@ -396,7 +397,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); backend.dispose(); backend = restoreKeyedBackend( @@ -469,7 +470,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -514,7 +515,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); // make some more modifications backend.setCurrentKey(1); @@ -525,7 +526,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory)); // validate the original state backend.setCurrentKey(1); @@ -613,7 +614,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); // make some more modifications backend.setCurrentKey(1); @@ -624,7 +625,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory)); // validate the original state backend.setCurrentKey(1); @@ -715,7 +716,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); // make some more modifications backend.setCurrentKey(1); @@ -727,7 +728,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory)); // validate the original state backend.setCurrentKey(1); @@ -966,7 +967,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory)); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory)); List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( Collections.singletonList(snapshot), @@ -1033,7 +1034,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); backend.dispose(); // restore the first snapshot and validate it @@ -1084,7 +1085,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); backend.dispose(); // restore the first snapshot and validate it @@ -1137,7 +1138,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory)); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory)); backend.dispose(); // restore the first snapshot and validate it @@ -1387,7 +1388,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory)); backend.dispose(); @@ -1418,7 +1419,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); // draw a snapshot - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory)); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory)); assertNull(snapshot); backend.dispose(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java index e59d027..d6966d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java @@ -30,7 +30,7 @@ public class StateUtilTest extends TestLogger { */ @Test public void testDiscardRunnableFutureWithNullValue() throws Exception { - RunnableFuture<StateHandle<?>> stateFuture = new DoneFuture<>(null); + RunnableFuture<StateHandle<?>> stateFuture = DoneFuture.nullValue(); StateUtil.discardStateFuture(stateFuture); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 5a6c37b..f3dc2b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -121,4 +121,16 @@ public class OperatorSnapshotResult { throw exception; } } + + public boolean hasKeyedState() { + return keyedStateManagedFuture != null || keyedStateRawFuture != null; + } + + public boolean hasOperatorState() { + return operatorStateManagedFuture != null || operatorStateRawFuture != null; + } + + public boolean hasState() { + return hasKeyedState() || hasOperatorState(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1e05f19..d79edb4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -933,14 +933,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream = new ChainedStateHandle<>(operatorStatesStream); - SubtaskState subtaskState = new SubtaskState( + SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles( chainedNonPartitionedOperatorsState, chainedOperatorStateBackend, chainedOperatorStateStream, keyedStateHandleBackend, keyedStateHandleStream); - if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, + CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); if (LOG.isDebugEnabled()) { @@ -958,7 +960,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> asyncCheckpointState.compareAndSet( CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING); - + try { cleanup(); } catch (Exception cleanupException) { @@ -987,6 +989,31 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } + private SubtaskState createSubtaskStateFromSnapshotStateHandles( + ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState, + ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend, + ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream, + KeyGroupsStateHandle keyedStateHandleBackend, + KeyGroupsStateHandle keyedStateHandleStream) { + + boolean hasAnyState = keyedStateHandleBackend != null + || keyedStateHandleStream != null + || !chainedOperatorStateBackend.isEmpty() + || !chainedOperatorStateStream.isEmpty() + || !chainedNonPartitionedOperatorsState.isEmpty(); + + // we signal a stateless task by reporting null, so that there are no attempts to assign empty state to + // stateless tasks on restore. This allows for simple job modifications that only concern stateless without + // the need to assign them uids to match their (always empty) states. + return hasAnyState ? new SubtaskState( + chainedNonPartitionedOperatorsState, + chainedOperatorStateBackend, + chainedOperatorStateStream, + keyedStateHandleBackend, + keyedStateHandleStream) + : null; + } + private void cleanup() throws Exception { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) { LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName()); @@ -1122,7 +1149,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @SuppressWarnings("deprecation") private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { - // first call the legacy checkpoint code paths + // first call the legacy checkpoint code paths nonPartitionedStates.add(op.snapshotLegacyOperatorState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp())); http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 6075dc5..4dc3095 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks; import akka.dispatch.Futures; - import com.google.common.util.concurrent.MoreExecutors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -84,10 +83,9 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; - import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -107,8 +105,10 @@ import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -636,6 +636,74 @@ public class StreamTaskTest extends TestLogger { verify(rawOperatorStateHandle).discardState(); } + /** + * FLINK-5985 + * + * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This + * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint. + */ + @Test + public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + + when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); + when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); + + Environment mockEnvironment = mock(Environment.class); + + // latch blocks until the async checkpoint thread acknowledges + final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); + final List<SubtaskState> checkpointResult = new ArrayList<>(1); + + // we remember what is acknowledged (expected to be null as our task will snapshot empty states). + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + SubtaskState subtaskState = invocationOnMock.getArgumentAt(1, SubtaskState.class); + checkpointResult.add(subtaskState); + checkpointCompletedLatch.trigger(); + return null; + } + }).when(mockEnvironment).acknowledgeCheckpoint(any(CheckpointMetaData.class), any(SubtaskState.class)); + + when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + + StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + streamTask.setEnvironment(mockEnvironment); + + // mock the operators + StreamOperator<?> statelessOperator = + mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + + // mock the returned empty snapshot result (all state handles are null) + OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult(); + when(statelessOperator.snapshotState(anyLong(), anyLong())) + .thenReturn(statelessOperatorSnapshotResult); + + // set up the task + StreamOperator<?>[] streamOperators = {statelessOperator}; + OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool()); + + streamTask.triggerCheckpoint(checkpointMetaData); + checkpointCompletedLatch.await(30, TimeUnit.SECONDS); + streamTask.cancel(); + + // ensure that 'null' was acknowledged as subtask state + Assert.assertNull(checkpointResult.get(0)); + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a9e74b59/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 77777d1..72b747a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -25,6 +25,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -476,6 +477,149 @@ public class SavepointITCase extends TestLogger { } } + /** + * FLINK-5985 + * + * This test ensures we can restore from a savepoint under modifications to the job graph that only concern + * stateless operators. + */ + @Test + public void testCanRestoreWithModifiedStatelessOperators() throws Exception { + + // Config + int numTaskManagers = 2; + int numSlotsPerTaskManager = 4; + int parallelism = 2; + + // Test deadline + final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); + + final File tmpDir = CommonTestUtils.createTempDirectory(); + final File savepointDir = new File(tmpDir, "savepoints"); + + TestingCluster flink = null; + String savepointPath; + try { + // Flink configuration + final Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, + savepointDir.toURI().toString()); + + LOG.info("Flink configuration: " + config + "."); + + // Start Flink + flink = new TestingCluster(config); + LOG.info("Starting Flink cluster."); + flink.start(true); + + // Retrieve the job manager + LOG.info("Retrieving JobManager."); + ActorGateway jobManager = Await.result( + flink.leaderGateway().future(), + deadline.timeLeft()); + LOG.info("JobManager: " + jobManager + "."); + + final StatefulCounter statefulCounter = new StatefulCounter(); + StatefulCounter.resetForTest(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.addSource(new InfiniteTestSource()) + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return 4 * value; + } + }) + .shuffle() + .map(statefulCounter).uid("statefulCounter") + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return 2 * value; + } + }) + .addSink(new DiscardingSink<Integer>()); + + JobGraph originalJobGraph = env.getStreamGraph().getJobGraph(); + + JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph); + JobID jobID = submissionResult.getJobID(); + + LOG.info("Waiting for some progress."); + StatefulCounter.progressLatch.await(60, TimeUnit.SECONDS); + + Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); + savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath(); + Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft()); + + ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint(); + LOG.info("Retrieved savepoint: " + savepointPath + "."); + + // Shut down the Flink cluster (thereby canceling the job) + LOG.info("Shutting down Flink cluster."); + flink.shutdown(); + flink.awaitTermination(); + + } finally { + flink.shutdown(); + flink.awaitTermination(); + } + + try { + LOG.info("Restarting Flink cluster."); + flink.start(true); + + // Retrieve the job manager + LOG.info("Retrieving JobManager."); + ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft()); + LOG.info("JobManager: " + jobManager + "."); + + // Reset static test helpers + StatefulCounter.resetForTest(); + + // Gather all task deployment descriptors + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + + // generate a modified job graph that adds a stateless op + env.addSource(new InfiniteTestSource()) + .shuffle() + .map(new StatefulCounter()).uid("statefulCounter") + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + }) + .addSink(new DiscardingSink<Integer>()); + + JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph(); + + // Set the savepoint path + modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + + LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with " + + "savepoint path " + savepointPath + " in detached mode."); + + // Submit the job + flink.submitJobDetached(modifiedJobGraph); + // Await state is restored + StatefulCounter.progressLatch.await(60, TimeUnit.SECONDS); + } finally { + flink.shutdown(); + flink.awaitTermination(); + } + } + // ------------------------------------------------------------------------ // Test program // ------------------------------------------------------------------------ @@ -529,10 +673,12 @@ public class SavepointITCase extends TestLogger { implements ListCheckpointed<byte[]>, CheckpointListener { private static final Object checkpointLock = new Object(); + private static volatile OneShotLatch progressLatch = new OneShotLatch(); private static int numCompleteCalls; private static int numRestoreCalls; private static boolean restoredFromCheckpoint; + private static final long serialVersionUID = 7317800376639115920L; private byte[] data; @@ -551,6 +697,9 @@ public class SavepointITCase extends TestLogger { for (int i = 0; i < data.length; i++) { data[i] += 1; } + + StatefulCounter.progressLatch.trigger(); + return value; } @@ -589,6 +738,7 @@ public class SavepointITCase extends TestLogger { numCompleteCalls = 0; numRestoreCalls = 0; restoredFromCheckpoint = false; + progressLatch = new OneShotLatch(); } }
