Repository: flink Updated Branches: refs/heads/master 97ccc1473 -> 20fff32a5
[FLINK-5985] Report no task states for stateless tasks on checkpointing Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20fff32a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20fff32a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20fff32a Branch: refs/heads/master Commit: 20fff32a5c6d3385bce54a3b76696fb3063a2ab2 Parents: 97ccc14 Author: Stefan Richter <[email protected]> Authored: Fri Mar 10 17:55:45 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Fri Mar 17 18:14:56 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 9 +- .../state/DefaultOperatorStateBackend.java | 2 +- .../apache/flink/runtime/state/DoneFuture.java | 16 +- .../state/ManagedInitializationContext.java | 3 +- .../flink/runtime/state/Snapshotable.java | 5 +- .../state/heap/HeapKeyedStateBackend.java | 8 +- .../checkpoint/PendingCheckpointTest.java | 35 ++++- .../runtime/state/OperatorStateBackendTest.java | 19 ++- .../runtime/state/StateBackendTestBase.java | 49 +++--- .../flink/runtime/state/StateUtilTest.java | 2 +- .../api/operators/OperatorSnapshotResult.java | 12 ++ .../streaming/runtime/tasks/StreamTask.java | 32 +++- .../streaming/runtime/tasks/StreamTaskTest.java | 74 ++++++++- .../test/checkpointing/SavepointITCase.java | 153 ++++++++++++++++++- 14 files changed, 365 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index f585d21..5b72e03 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -240,6 +240,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return keyGroupPrefixBytes; } + private boolean hasRegisteredState() { + return !kvStateInformation.isEmpty(); + } + /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always @@ -267,13 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (db != null) { - if (kvStateInformation.isEmpty()) { + if (!hasRegisteredState()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); } - - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } snapshotOperation.takeDBSnapShot(checkpointId, timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 8dcf49e..2497a00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -161,7 +161,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { CheckpointOptions checkpointOptions) throws Exception { if (registeredStates.isEmpty()) { - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java index 777ab69..d2d808d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java @@ -30,10 +30,13 @@ import java.util.concurrent.TimeoutException; * @param <T> The type of object in this {@code Future}. */ public class DoneFuture<T> implements RunnableFuture<T> { - private final T keyGroupsStateHandle; - public DoneFuture(T keyGroupsStateHandle) { - this.keyGroupsStateHandle = keyGroupsStateHandle; + private static final DoneFuture<?> NULL_FUTURE = new DoneFuture<Object>(null); + + private final T payload; + + public DoneFuture(T payload) { + this.payload = payload; } @Override @@ -53,7 +56,7 @@ public class DoneFuture<T> implements RunnableFuture<T> { @Override public T get() throws InterruptedException, ExecutionException { - return keyGroupsStateHandle; + return payload; } @Override @@ -67,4 +70,9 @@ public class DoneFuture<T> implements RunnableFuture<T> { public void run() { } + + @SuppressWarnings("unchecked") + public static <T> DoneFuture<T> nullValue() { + return (DoneFuture<T>) NULL_FUTURE; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index 5255c43..522aca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -36,7 +36,8 @@ import org.apache.flink.api.common.state.OperatorStateStore; public interface ManagedInitializationContext { /** - * Returns true, if some managed state was restored from the snapshot of a previous execution. + * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for + * stateless tasks. */ boolean isRestored(); http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java index 0d92b46..c7e62f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; + import java.util.Collection; import java.util.concurrent.RunnableFuture; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; /** - * Interface for operations that can perform snapshots of their state. + * Interface for operators that can perform snapshots of their state. * * @param <S> Generic type of the state object that is created as handle to snapshots. */ http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 0335933..f3e4ec6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -150,6 +150,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return stateTable; } + private boolean hasRegisteredState() { + return !stateTables.isEmpty(); + } + @Override public <N, V> InternalValueState<N, V> createValueState( TypeSerializer<N> namespaceSerializer, @@ -225,8 +229,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (stateTables.isEmpty()) { - return new DoneFuture<>(null); + if (!hasRegisteredState()) { + return DoneFuture.nullValue(); } long syncStartTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 55b5fe0..a15684c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,11 +24,10 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; - +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.mockito.Mockito; import java.io.File; @@ -49,6 +48,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; public class PendingCheckpointTest { @@ -56,7 +56,10 @@ public class PendingCheckpointTest { private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); static { - ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getMaxParallelism()).thenReturn(128); + when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1); + ACK_TASKS.put(ATTEMPT_ID, vertex); } @Rule @@ -288,6 +291,32 @@ public class PendingCheckpointTest { } } + /** + * FLINK-5985 + * <p> + * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they + * should not appear in the task states map of the checkpoint. + */ + @Test + public void testNullSubtaskStateLeadsToStatelessTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class)); + Assert.assertTrue(pending.getTaskStates().isEmpty()); + } + + /** + * FLINK-5985 + * <p> + * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that + * for subtasks that acknowledge some state are given an entry in the task states of the checkpoint. + */ + @Test + public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetrics.class)); + Assert.assertFalse(pending.getTaskStates().isEmpty()); + } + @Test public void testSetCanceller() { final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 94df524..d883d6e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -23,11 +23,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.FutureUtil; +import org.junit.Assert; import org.junit.Test; import java.io.Serializable; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -144,6 +147,19 @@ public class OperatorStateBackendTest { } @Test + public void testSnapshotEmpty() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + CheckpointStreamFactory streamFactory = + abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); + + RunnableFuture<OperatorStateHandle> snapshot = + operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); + + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + Assert.assertNull(stateHandle); + } + + @Test public void testSnapshotRestore() throws Exception { DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); @@ -166,7 +182,8 @@ public class OperatorStateBackendTest { listState3.add(20); CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); - OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get(); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet( + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); try { http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 331c6bd..faa9314 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -41,8 +41,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -57,6 +57,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.types.IntValue; +import org.apache.flink.util.FutureUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -199,7 +200,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -210,7 +211,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -411,7 +412,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend( @@ -484,7 +485,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -529,7 +530,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -540,7 +541,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -628,7 +629,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -639,7 +640,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -730,7 +731,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -742,7 +743,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -834,7 +835,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -846,7 +847,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }}); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1166,7 +1167,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( Collections.singletonList(snapshot), @@ -1233,7 +1234,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1284,7 +1285,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1337,7 +1338,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1388,7 +1389,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.put("2", "Second"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); @@ -1693,7 +1694,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // draw a snapshot KeyGroupsStateHandle snapshot = - runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); + FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); assertNull(snapshot); backend.dispose(); @@ -1934,12 +1935,4 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); } } - - private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws Exception { - if(!snapshotRunnableFuture.isDone()) { - Thread runner = new Thread(snapshotRunnableFuture); - runner.start(); - } - return snapshotRunnableFuture.get(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java index e59d027..d6966d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java @@ -30,7 +30,7 @@ public class StateUtilTest extends TestLogger { */ @Test public void testDiscardRunnableFutureWithNullValue() throws Exception { - RunnableFuture<StateHandle<?>> stateFuture = new DoneFuture<>(null); + RunnableFuture<StateHandle<?>> stateFuture = DoneFuture.nullValue(); StateUtil.discardStateFuture(stateFuture); } } http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 83697ae..b1c94cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -121,4 +121,16 @@ public class OperatorSnapshotResult { throw exception; } } + + public boolean hasKeyedState() { + return keyedStateManagedFuture != null || keyedStateRawFuture != null; + } + + public boolean hasOperatorState() { + return operatorStateManagedFuture != null || operatorStateRawFuture != null; + } + + public boolean hasState() { + return hasKeyedState() || hasOperatorState(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index ccaa312..76b2b98 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -57,7 +57,6 @@ import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -925,14 +924,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream = new ChainedStateHandle<>(operatorStatesStream); - SubtaskState subtaskState = new SubtaskState( + SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles( chainedNonPartitionedOperatorsState, chainedOperatorStateBackend, chainedOperatorStateStream, keyedStateHandleBackend, keyedStateHandleStream); - if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, + CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, @@ -982,6 +983,31 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } + private SubtaskState createSubtaskStateFromSnapshotStateHandles( + ChainedStateHandle<StreamStateHandle> chainedNonPartitionedOperatorsState, + ChainedStateHandle<OperatorStateHandle> chainedOperatorStateBackend, + ChainedStateHandle<OperatorStateHandle> chainedOperatorStateStream, + KeyGroupsStateHandle keyedStateHandleBackend, + KeyGroupsStateHandle keyedStateHandleStream) { + + boolean hasAnyState = keyedStateHandleBackend != null + || keyedStateHandleStream != null + || !chainedOperatorStateBackend.isEmpty() + || !chainedOperatorStateStream.isEmpty() + || !chainedNonPartitionedOperatorsState.isEmpty(); + + // we signal a stateless task by reporting null, so that there are no attempts to assign empty state to + // stateless tasks on restore. This allows for simple job modifications that only concern stateless without + // the need to assign them uids to match their (always empty) states. + return hasAnyState ? new SubtaskState( + chainedNonPartitionedOperatorsState, + chainedOperatorStateBackend, + chainedOperatorStateStream, + keyedStateHandleBackend, + keyedStateHandleStream) + : null; + } + private void cleanup() throws Exception { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) { LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 3826051..d7e3d6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks; import akka.dispatch.Futures; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; @@ -88,10 +87,9 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; - import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -111,8 +109,10 @@ import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -640,6 +640,74 @@ public class StreamTaskTest extends TestLogger { verify(rawOperatorStateHandle).discardState(); } + /** + * FLINK-5985 + * + * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This + * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint. + */ + @Test + public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + + when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); + when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); + + Environment mockEnvironment = mock(Environment.class); + + // latch blocks until the async checkpoint thread acknowledges + final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); + final List<SubtaskState> checkpointResult = new ArrayList<>(1); + + // we remember what is acknowledged (expected to be null as our task will snapshot empty states). + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + SubtaskState subtaskState = invocationOnMock.getArgumentAt(2, SubtaskState.class); + checkpointResult.add(subtaskState); + checkpointCompletedLatch.trigger(); + return null; + } + }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), any(CheckpointMetrics.class), any(SubtaskState.class)); + + when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + + StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + streamTask.setEnvironment(mockEnvironment); + + // mock the operators + StreamOperator<?> statelessOperator = + mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + + // mock the returned empty snapshot result (all state handles are null) + OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult(); + when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))) + .thenReturn(statelessOperatorSnapshotResult); + + // set up the task + StreamOperator<?>[] streamOperators = {statelessOperator}; + OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool()); + + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); + checkpointCompletedLatch.await(30, TimeUnit.SECONDS); + streamTask.cancel(); + + // ensure that 'null' was acknowledged as subtask state + Assert.assertNull(checkpointResult.get(0)); + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/20fff32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index ed45807..a5c994a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -23,9 +23,8 @@ import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import java.io.FileNotFoundException; -import java.util.concurrent.CountDownLatch; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -88,12 +87,14 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.File; +import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; @@ -107,6 +108,7 @@ import static org.junit.Assert.fail; /** * Integration test for triggering and resuming from savepoints. */ +@SuppressWarnings("serial") public class SavepointITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class); @@ -458,6 +460,152 @@ public class SavepointITCase extends TestLogger { } } + /** + * FLINK-5985 + * + * This test ensures we can restore from a savepoint under modifications to the job graph that only concern + * stateless operators. + */ + @Test + public void testCanRestoreWithModifiedStatelessOperators() throws Exception { + + // Config + int numTaskManagers = 2; + int numSlotsPerTaskManager = 2; + int parallelism = 2; + + // Test deadline + final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); + + final File tmpDir = CommonTestUtils.createTempDirectory(); + final File savepointDir = new File(tmpDir, "savepoints"); + + TestingCluster flink = null; + String savepointPath; + try { + // Flink configuration + final Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, + savepointDir.toURI().toString()); + + LOG.info("Flink configuration: " + config + "."); + + // Start Flink + flink = new TestingCluster(config); + LOG.info("Starting Flink cluster."); + flink.start(true); + + // Retrieve the job manager + LOG.info("Retrieving JobManager."); + ActorGateway jobManager = Await.result( + flink.leaderGateway().future(), + deadline.timeLeft()); + LOG.info("JobManager: " + jobManager + "."); + + final StatefulCounter statefulCounter = new StatefulCounter(); + StatefulCounter.resetForTest(parallelism); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.addSource(new InfiniteTestSource()) + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return 4 * value; + } + }) + .shuffle() + .map(statefulCounter).uid("statefulCounter") + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return 2 * value; + } + }) + .addSink(new DiscardingSink<Integer>()); + + JobGraph originalJobGraph = env.getStreamGraph().getJobGraph(); + + JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph); + JobID jobID = submissionResult.getJobID(); + + // wait for the Tasks to be ready + StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); + savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath(); + Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft()); + + ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint(); + LOG.info("Retrieved savepoint: " + savepointPath + "."); + + // Shut down the Flink cluster (thereby canceling the job) + LOG.info("Shutting down Flink cluster."); + flink.shutdown(); + flink.awaitTermination(); + + } finally { + flink.shutdown(); + flink.awaitTermination(); + } + + try { + LOG.info("Restarting Flink cluster."); + flink.start(true); + + // Retrieve the job manager + LOG.info("Retrieving JobManager."); + ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft()); + LOG.info("JobManager: " + jobManager + "."); + + // Reset static test helpers + StatefulCounter.resetForTest(parallelism); + + // Gather all task deployment descriptors + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + + // generate a modified job graph that adds a stateless op + env.addSource(new InfiniteTestSource()) + .shuffle() + .map(new StatefulCounter()).uid("statefulCounter") + .shuffle() + .map(new MapFunction<Integer, Integer>() { + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + }) + .addSink(new DiscardingSink<Integer>()); + + JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph(); + + // Set the savepoint path + modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + + LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with " + + "savepoint path " + savepointPath + " in detached mode."); + + // Submit the job + flink.submitJobDetached(modifiedJobGraph); + // Await state is restored + StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + // Await some progress after restore + StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } finally { + flink.shutdown(); + flink.awaitTermination(); + } + } + // ------------------------------------------------------------------------ // Test program // ------------------------------------------------------------------------ @@ -497,6 +645,7 @@ public class SavepointITCase extends TestLogger { synchronized (ctx.getCheckpointLock()) { ctx.collect(1); } + Thread.sleep(1); } }
