[FLINK-6633] Register shared state before adding to CompletedCheckpointStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0162543a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0162543a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0162543a Branch: refs/heads/master Commit: 0162543ac13f048ef67a6586d8a6e8021ec9dcd4 Parents: 3d119e1 Author: Stefan Richter <[email protected]> Authored: Tue May 16 12:32:05 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Fri May 19 10:57:32 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 56 +-- .../state/RocksDBStateBackendTest.java | 88 ++++- .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++----- .../flink/runtime/checkpoint/OperatorState.java | 7 - .../checkpoint/OperatorSubtaskState.java | 11 - .../StandaloneCompletedCheckpointStore.java | 4 +- .../flink/runtime/checkpoint/SubtaskState.java | 11 - .../flink/runtime/checkpoint/TaskState.java | 7 - .../ZooKeeperCompletedCheckpointStore.java | 149 +++----- .../savepoint/SavepointV2Serializer.java | 17 +- .../runtime/state/CompositeStateHandle.java | 15 +- .../state/IncrementalKeyedStateHandle.java | 171 ++++----- .../runtime/state/KeyGroupsStateHandle.java | 5 - .../state/PlaceholderStreamStateHandle.java | 44 +-- .../runtime/state/SharedStateRegistry.java | 54 +-- .../state/memory/ByteStreamStateHandle.java | 7 + .../checkpoint/CheckpointCoordinatorTest.java | 25 -- .../CompletedCheckpointStoreTest.java | 61 +-- .../checkpoint/CompletedCheckpointTest.java | 3 - .../checkpoint/PendingCheckpointTest.java | 1 - ...ZooKeeperCompletedCheckpointStoreITCase.java | 7 +- .../savepoint/CheckpointTestUtils.java | 25 +- .../state/IncrementalKeyedStateHandleTest.java | 206 ++++++++++ .../runtime/state/SharedStateRegistryTest.java | 14 +- .../runtime/state/StateBackendTestBase.java | 2 - .../RecoverableCompletedCheckpointStore.java | 5 +- ...tractEventTimeWindowCheckpointingITCase.java | 9 +- .../JobManagerHACheckpointRecoveryITCase.java | 375 +++++++++---------- 28 files changed, 747 insertions(+), 776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 88a759d..1f32a89 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -105,6 +105,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** True if incremental checkpointing is enabled */ private final boolean enableIncrementalCheckpointing; - /** The sst files materialized in pending checkpoints */ - private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>(); + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ + private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>(); /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final long checkpointTimestamp; /** All sst files that were part of the last previously completed checkpoint */ - private Map<StateHandleID, StreamStateHandle> baseSstFiles; + private Set<StateHandleID> baseSstFiles; /** The state meta data */ private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); @@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final CloseableRegistry closeableRegistry = new CloseableRegistry(); // new sst files since the last completed checkpoint - private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>(); - - // old sst files which have been materialized in previous completed checkpoints - private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>(); + private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // handles to the misc files in the current snapshot private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); @@ -830,7 +828,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // use the last completed checkpoint as the comparison base. baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); - // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { @@ -867,18 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final StateHandleID stateHandleID = new StateHandleID(fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { - StreamStateHandle fileHandle = - baseSstFiles == null ? null : baseSstFiles.get(fileName); + final boolean existsAlready = + baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID); - if (fileHandle == null) { - fileHandle = materializeStateData(filePath); - newSstFiles.put(stateHandleID, fileHandle); - } else { + if (existsAlready) { // we introduce a placeholder state handle, that is replaced with the // original from the shared state registry (created from a previous checkpoint) - oldSstFiles.put( + sstFiles.put( stateHandleID, - new PlaceholderStreamStateHandle(fileHandle.getStateSize())); + new PlaceholderStreamStateHandle()); + } else { + sstFiles.put(stateHandleID, materializeStateData(filePath)); } } else { StreamStateHandle fileHandle = materializeStateData(filePath); @@ -887,22 +883,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - Map<StateHandleID, StreamStateHandle> sstFiles = - new HashMap<>(newSstFiles.size() + oldSstFiles.size()); - sstFiles.putAll(newSstFiles); - sstFiles.putAll(oldSstFiles); synchronized (stateBackend.asyncSnapshotLock) { - stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); } return new IncrementalKeyedStateHandle( stateBackend.operatorIdentifier, stateBackend.keyGroupRange, checkpointId, - newSstFiles, - oldSstFiles, + sstFiles, miscFiles, metaStateHandle); } @@ -933,7 +924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(newSstFiles.values()); + statesToDiscard.addAll(sstFiles.values()); try { StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); @@ -1308,15 +1299,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { UUID.randomUUID().toString()); try { - final Map<StateHandleID, StreamStateHandle> newSstFiles = - restoreStateHandle.getCreatedSharedState(); - final Map<StateHandleID, StreamStateHandle> oldSstFiles = - restoreStateHandle.getReferencedSharedState(); + final Map<StateHandleID, StreamStateHandle> sstFiles = + restoreStateHandle.getSharedState(); final Map<StateHandleID, StreamStateHandle> miscFiles = restoreStateHandle.getPrivateState(); - readAllStateData(newSstFiles, restoreInstancePath); - readAllStateData(oldSstFiles, restoreInstancePath); + readAllStateData(sstFiles, restoreInstancePath); readAllStateData(miscFiles, restoreInstancePath); // read meta data @@ -1409,8 +1397,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new IOException("Could not create RocksDB data directory."); } - createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath); - createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath); + createFileHardLinksInRestorePath(sstFiles, restoreInstancePath); createFileHardLinksInRestorePath(miscFiles, restoreInstancePath); List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); @@ -1437,10 +1424,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // use the restore sst files as the base for succeeding checkpoints - Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); - sstFiles.putAll(newSstFiles); - sstFiles.putAll(oldSstFiles); - stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles); + stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 9340455..89eb1d5 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -26,18 +26,22 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,7 +62,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.RunnableFuture; import static junit.framework.TestCase.assertNotNull; @@ -67,6 +75,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.powermock.api.mockito.PowerMockito.mock; @@ -351,6 +360,83 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa assertEquals(1, allFilesInDbDir.size()); } + @Test + public void testSharedIncrementalStateDeRegistration() throws Exception { + if (enableIncrementalCheckpointing) { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class, null); + + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + + Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); + for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { + + reset(sharedStateRegistry); + + backend.setCurrentKey(checkpointId); + state.update("Hello-" + checkpointId); + + RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot( + checkpointId, + checkpointId, + createStreamFactory(), + CheckpointOptions.forFullCheckpoint()); + + snapshot.run(); + + IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get(); + Map<StateHandleID, StreamStateHandle> sharedState = + new HashMap<>(stateHandle.getSharedState()); + + stateHandle.registerSharedStates(sharedStateRegistry); + + for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) { + verify(sharedStateRegistry).registerReference( + stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()), + e.getValue()); + } + + previousStateHandles.add(stateHandle); + backend.notifyCheckpointComplete(checkpointId); + + //----------------------------------------------------------------- + + if (previousStateHandles.size() > 1) { + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } + } + + while (!previousStateHandles.isEmpty()) { + + reset(sharedStateRegistry); + + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } + + backend.close(); + backend.dispose(); + } + } + + private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception { + for (StateHandleID id : remove.getSharedState().keySet()) { + verify(registry, times(0)).unregisterReference( + remove.createSharedStateRegistryKeyFromFileName(id)); + } + + remove.discardState(); + + for (StateHandleID id : remove.getSharedState().keySet()) { + verify(registry).unregisterReference( + remove.createSharedStateRegistryKeyFromFileName(id)); + } + } private void runStateUpdates() throws Exception{ for (int i = 50; i < 150; ++i) { http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 1ab5b41..b382080 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -25,8 +25,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,13 +175,13 @@ public class CompletedCheckpoint implements Serializable { } public void discardOnFailedStoring() throws Exception { - new UnstoredDiscardStategy().discard(); + doDiscard(); } public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception { if (props.discardOnSubsumed()) { - new StoredDiscardStrategy(sharedStateRegistry).discard(); + doDiscard(); return true; } @@ -197,7 +195,7 @@ public class CompletedCheckpoint implements Serializable { jobStatus == JobStatus.FAILED && props.discardOnJobFailed() || jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) { - new StoredDiscardStrategy(sharedStateRegistry).discard(); + doDiscard(); return true; } else { if (externalPointer != null) { @@ -209,6 +207,42 @@ public class CompletedCheckpoint implements Serializable { } } + private void doDiscard() throws Exception { + + try { + // collect exceptions and continue cleanup + Exception exception = null; + + // drop the metadata, if we have some + if (externalizedMetadata != null) { + try { + externalizedMetadata.discardState(); + } catch (Exception e) { + exception = e; + } + } + + // discard private state objects + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } finally { + operatorStates.clear(); + + // to be null-pointer safe, copy reference to stack + CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback; + if (discardCallback != null) { + discardCallback.notifyDiscardedCheckpoint(); + } + } + } + public long getStateSize() { long result = 0L; @@ -252,7 +286,7 @@ public class CompletedCheckpoint implements Serializable { /** * Register all shared states in the given registry. This is method is called - * when the completed checkpoint has been successfully added into the store. + * before the checkpoint is added into the store. * * @param sharedStateRegistry The registry where shared states are registered */ @@ -266,102 +300,4 @@ public class CompletedCheckpoint implements Serializable { public String toString() { return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); } - - /** - * Base class for the discarding strategies of {@link CompletedCheckpoint}. - */ - private abstract class DiscardStrategy { - - protected Exception storedException; - - public DiscardStrategy() { - this.storedException = null; - } - - public void discard() throws Exception { - - try { - // collect exceptions and continue cleanup - storedException = null; - - doDiscardExternalizedMetaData(); - doDiscardSharedState(); - doDiscardPrivateState(); - doReportStoredExceptions(); - } finally { - clearTaskStatesAndNotifyDiscardCompleted(); - } - } - - protected void doDiscardExternalizedMetaData() { - // drop the metadata, if we have some - if (externalizedMetadata != null) { - try { - externalizedMetadata.discardState(); - } catch (Exception e) { - storedException = e; - } - } - } - - protected void doDiscardPrivateState() { - // discard private state objects - try { - StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); - } catch (Exception e) { - storedException = ExceptionUtils.firstOrSuppressed(e, storedException); - } - } - - protected abstract void doDiscardSharedState(); - - protected void doReportStoredExceptions() throws Exception { - if (storedException != null) { - throw storedException; - } - } - - protected void clearTaskStatesAndNotifyDiscardCompleted() { - operatorStates.clear(); - // to be null-pointer safe, copy reference to stack - CompletedCheckpointStats.DiscardCallback discardCallback = - CompletedCheckpoint.this.discardCallback; - - if (discardCallback != null) { - discardCallback.notifyDiscardedCheckpoint(); - } - } - } - - /** - * Discard all shared states created in the checkpoint. This strategy is applied - * when the completed checkpoint fails to be added into the store. - */ - private class UnstoredDiscardStategy extends CompletedCheckpoint.DiscardStrategy { - - @Override - protected void doDiscardSharedState() { - // nothing to do because we did not register any shared state yet. unregistered, new - // shared state is then still considered private state and deleted as part of - // doDiscardPrivateState(). - } - } - - /** - * Unregister all shared states from the given registry. This is strategy is - * applied when the completed checkpoint is subsumed or the job terminates. - */ - private class StoredDiscardStrategy extends CompletedCheckpoint.DiscardStrategy { - - SharedStateRegistry sharedStateRegistry; - - public StoredDiscardStrategy(SharedStateRegistry sharedStateRegistry) { - this.sharedStateRegistry = Preconditions.checkNotNull(sharedStateRegistry); - } - - @Override - protected void doDiscardSharedState() { - sharedStateRegistry.unregisterAll(operatorStates.values()); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index aa676e7..b153028 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -126,13 +126,6 @@ public class OperatorState implements CompositeStateHandle { } @Override - public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) { - for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) { - operatorSubtaskState.unregisterSharedStates(sharedStateRegistry); - } - } - - @Override public long getStateSize() { long result = 0L; http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index 49ef863..e2ae632 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -158,17 +158,6 @@ public class OperatorSubtaskState implements CompositeStateHandle { } @Override - public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) { - if (managedKeyedState != null) { - managedKeyedState.unregisterSharedStates(sharedStateRegistry); - } - - if (rawKeyedState != null) { - rawKeyedState.unregisterSharedStates(sharedStateRegistry); - } - } - - @Override public long getStateSize() { return stateSize; } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index f5e1db3..233cfc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -63,10 +63,10 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); - checkpoint.registerSharedStates(sharedStateRegistry); + checkpoints.addLast(checkpoint); + if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { try { CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index a77baf3..20d675b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -162,17 +162,6 @@ public class SubtaskState implements CompositeStateHandle { } @Override - public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) { - if (managedKeyedState != null) { - managedKeyedState.unregisterSharedStates(sharedStateRegistry); - } - - if (rawKeyedState != null) { - rawKeyedState.unregisterSharedStates(sharedStateRegistry); - } - } - - @Override public long getStateSize() { return stateSize; } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index aa5c516..ed847a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -141,13 +141,6 @@ public class TaskState implements CompositeStateHandle { } @Override - public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) { - for (SubtaskState subtaskState : subtaskStates.values()) { - subtaskState.unregisterSharedStates(sharedStateRegistry); - } - } - - @Override public long getStateSize() { long result = 0L; http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 084d93e..4c3c1ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.ConcurrentModificationException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.Executor; @@ -79,7 +78,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi private final int maxNumberOfCheckpointsToRetain; /** Local completed checkpoints. */ - private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles; + private final ArrayDeque<CompletedCheckpoint> completedCheckpoints; /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. @@ -122,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor); - this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); LOG.info("Initialized in '{}'.", checkpointsPath); } @@ -146,7 +145,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi // Clear local handles in order to prevent duplicates on // recovery. The local handles should reflect the state // of ZooKeeper. - checkpointStateHandles.clear(); + completedCheckpoints.clear(); // Get all there is first List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; @@ -170,6 +169,11 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi try { completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); + if (completedCheckpoint != null) { + // Re-register all shared states in the checkpoint. + completedCheckpoint.registerSharedStates(sharedStateRegistry); + completedCheckpoints.add(completedCheckpoint); + } } catch (Exception e) { LOG.warn("Could not retrieve checkpoint. Removing it from the completed " + "checkpoint store.", e); @@ -177,11 +181,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi // remove the checkpoint with broken state handle removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0); } - - if (completedCheckpoint != null) { - completedCheckpoint.registerSharedStates(sharedStateRegistry); - checkpointStateHandles.add(checkpointStateHandle); - } } } @@ -195,20 +194,19 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi checkNotNull(checkpoint, "Checkpoint"); final String path = checkpointIdToPath(checkpoint.getCheckpointID()); - final RetrievableStateHandle<CompletedCheckpoint> stateHandle; - // First add the new one. If it fails, we don't want to loose existing data. - stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint); + // First, register all shared states in the checkpoint to consolidates placeholder. + checkpoint.registerSharedStates(sharedStateRegistry); - checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path)); + // Now add the new one. If it fails, we don't want to loose existing data. + checkpointsInZooKeeper.addAndLock(path, checkpoint); - // Register all shared states in the checkpoint - checkpoint.registerSharedStates(sharedStateRegistry); + completedCheckpoints.addLast(checkpoint); // Everything worked, let's remove a previous checkpoint if necessary. - while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { + while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry); + removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry); } catch (Exception e) { LOG.warn("Failed to subsume the old checkpoint", e); } @@ -219,60 +217,23 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi @Override public CompletedCheckpoint getLatestCheckpoint() { - if (checkpointStateHandles.isEmpty()) { + if (completedCheckpoints.isEmpty()) { return null; } else { - while(!checkpointStateHandles.isEmpty()) { - Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast(); - - try { - return retrieveCompletedCheckpoint(checkpointStateHandle); - } catch (Exception e) { - LOG.warn("Could not retrieve latest checkpoint. Removing it from " + - "the completed checkpoint store.", e); - - try { - // remove the checkpoint with broken state handle - Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast(); - removeBrokenStateHandle(checkpoint.f1, checkpoint.f0); - } catch (Exception removeException) { - LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException); - } - } - } - - return null; + return completedCheckpoints.peekLast(); } } @Override public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { - List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size()); - - Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandleIterator = checkpointStateHandles.iterator(); - - while (stateHandleIterator.hasNext()) { - Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath = stateHandleIterator.next(); - - try { - checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath)); - } catch (Exception e) { - LOG.warn("Could not retrieve checkpoint. Removing it from the completed " + - "checkpoint store.", e); - - // remove the checkpoint with broken state handle - stateHandleIterator.remove(); - removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0); - } - } - + List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints); return checkpoints; } @Override public int getNumberOfRetainedCheckpoints() { - return checkpointStateHandles.size(); + return completedCheckpoints.size(); } @Override @@ -285,15 +246,15 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi if (jobStatus.isGloballyTerminalState()) { LOG.info("Shutting down"); - for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) { + for (CompletedCheckpoint checkpoint : completedCheckpoints) { try { - removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry); + removeShutdown(checkpoint, jobStatus, sharedStateRegistry); } catch (Exception e) { LOG.error("Failed to discard checkpoint.", e); } } - checkpointStateHandles.clear(); + completedCheckpoints.clear(); String path = "/" + client.getNamespace(); @@ -303,7 +264,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi LOG.info("Suspending"); // Clear the local handles, but don't remove any state - checkpointStateHandles.clear(); + completedCheckpoints.clear(); // Release the state handle locks in ZooKeeper such that they can be deleted checkpointsInZooKeeper.releaseAll(); @@ -313,21 +274,18 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi // ------------------------------------------------------------------------ private void removeSubsumed( - final String pathInZooKeeper, + final CompletedCheckpoint completedCheckpoint, final SharedStateRegistry sharedStateRegistry) throws Exception { - - ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() { - @Override - public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { - if (value != null) { - final CompletedCheckpoint completedCheckpoint; - try { - completedCheckpoint = value.retrieveState(); - } catch (Exception e) { - throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e); - } - if (completedCheckpoint != null) { + if(completedCheckpoint == null) { + return; + } + + ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = + new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() { + @Override + public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { + if (value != null) { try { completedCheckpoint.discardOnSubsume(sharedStateRegistry); } catch (Exception e) { @@ -335,46 +293,41 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi } } } - } - }; + }; - checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action); + checkpointsInZooKeeper.releaseAndTryRemove( + checkpointIdToPath(completedCheckpoint.getCheckpointID()), + action); } private void removeShutdown( - final String pathInZooKeeper, + final CompletedCheckpoint completedCheckpoint, final JobStatus jobStatus, final SharedStateRegistry sharedStateRegistry) throws Exception { + if(completedCheckpoint == null) { + return; + } + ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() { @Override public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { - if (value != null) { - final CompletedCheckpoint completedCheckpoint; - - try { - completedCheckpoint = value.retrieveState(); - } catch (Exception e) { - throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e); - } - - if (completedCheckpoint != null) { - try { - completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); - } catch (Exception e) { - throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); - } - } + try { + completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); + } catch (Exception e) { + throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); } } }; - checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction); + checkpointsInZooKeeper.releaseAndTryRemove( + checkpointIdToPath(completedCheckpoint.getCheckpointID()), + removeAction); } private void removeBrokenStateHandle( - final String pathInZooKeeper, - final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception { + final String pathInZooKeeper, + final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception { checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() { @Override public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index b71418b..da0022c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -75,7 +74,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { private static final byte KEY_GROUPS_HANDLE = 3; private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5; - private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6; /** The singleton instance of the serializer */ public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); @@ -328,8 +326,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos); + serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); } else { throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass()); @@ -390,16 +387,14 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis); - Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis); - Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis); + Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis); Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); return new IncrementalKeyedStateHandle( operatorId, keyGroupRange, checkpointId, - createdStates, - referencedStates, + sharedStates, privateStates, metaDataStateHandle); } else { @@ -485,10 +480,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); - } else if (stateHandle instanceof PlaceholderStreamStateHandle) { - PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle; - dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE); - dos.writeLong(placeholder.getStateSize()); } else { throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass()); } @@ -510,8 +501,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { byte[] data = new byte[numBytes]; dis.readFully(data); return new ByteStreamStateHandle(handleName, data); - } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) { - return new PlaceholderStreamStateHandle(dis.readLong()); } else { throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java index 002b7c3..1bc6a0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java @@ -34,7 +34,8 @@ package org.apache.flink.runtime.state; * this handle and considered as private state until it is registered for the first time. Registration * transfers ownership to the {@link SharedStateRegistry}. * The composite state handle should only delete all private states in the - * {@link StateObject#discardState()} method. + * {@link StateObject#discardState()} method, the {@link SharedStateRegistry} is responsible for + * deleting shared states after they were registered. */ public interface CompositeStateHandle extends StateObject { @@ -45,18 +46,10 @@ public interface CompositeStateHandle extends StateObject { * <p> * After this is completed, newly created shared state is considered as published is no longer * owned by this handle. This means that it should no longer be deleted as part of calls to - * {@link #discardState()}. + * {@link #discardState()}. Instead, {@link #discardState()} will trigger an unregistration + * from the registry. * * @param stateRegistry The registry where shared states are registered. */ void registerSharedStates(SharedStateRegistry stateRegistry); - - /** - * Unregister both created and referenced shared states in the given - * {@link SharedStateRegistry}. This method is called when the checkpoint is - * subsumed or the job is shut down. - * - * @param stateRegistry The registry where shared states are registered. - */ - void unregisterSharedStates(SharedStateRegistry stateRegistry); } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 706e219..770b5a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -28,18 +28,24 @@ import java.util.Map; /** * The handle to states of an incremental snapshot. * <p> - * The states contained in an incremental snapshot include + * The states contained in an incremental snapshot include: * <ul> - * <li> Created shared state which includes (the supposed to be) shared files produced since the last + * <li> Created shared state which includes shared files produced since the last * completed checkpoint. These files can be referenced by succeeding checkpoints if the * checkpoint succeeds to complete. </li> * <li> Referenced shared state which includes the shared files materialized in previous - * checkpoints. </li> + * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced + * shared state handles are only placeholders, so that we do not send state handles twice + * from which we know that they already exist on the checkpoint coordinator.</li> * <li> Private state which includes all other files, typically mutable, that cannot be shared by * other checkpoints. </li> * <li> Backend meta state which includes the information of existing states. </li> * </ul> * + * When this should become a completed checkpoint on the checkpoint coordinator, it must first be + * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to + * previously existing state are replaced with the originals. + * * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They * should not be called from production code. This means this class is also not suited to serve as * a key, e.g. in hash maps. @@ -66,14 +72,9 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private final long checkpointId; /** - * State that the incremental checkpoint created new - */ - private final Map<StateHandleID, StreamStateHandle> createdSharedState; - - /** - * State that the incremental checkpoint references from previous checkpoints + * Shared state in the incremental checkpoint. This i */ - private final Map<StateHandleID, StreamStateHandle> referencedSharedState; + private final Map<StateHandleID, StreamStateHandle> sharedState; /** * Private state in the incremental checkpoint @@ -86,32 +87,30 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { private final StreamStateHandle metaStateHandle; /** - * True if the state handle has already registered shared states. - * <p> - * Once the shared states are registered, it's the {@link SharedStateRegistry}'s - * responsibility to maintain the shared states. But in the cases where the - * state handle is discarded before performing the registration, the handle - * should delete all the shared states created by it. + * Once the shared states are registered, it is the {@link SharedStateRegistry}'s + * responsibility to cleanup those shared states. + * But in the cases where the state handle is discarded before performing the registration, + * the handle should delete all the shared states created by it. + * + * This variable is not null iff the handles was registered. */ - private boolean registered; + private transient SharedStateRegistry sharedStateRegistry; public IncrementalKeyedStateHandle( String operatorIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<StateHandleID, StreamStateHandle> createdSharedState, - Map<StateHandleID, StreamStateHandle> referencedSharedState, + Map<StateHandleID, StreamStateHandle> sharedState, Map<StateHandleID, StreamStateHandle> privateState, StreamStateHandle metaStateHandle) { this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.checkpointId = checkpointId; - this.createdSharedState = Preconditions.checkNotNull(createdSharedState); - this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState); + this.sharedState = Preconditions.checkNotNull(sharedState); this.privateState = Preconditions.checkNotNull(privateState); this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); - this.registered = false; + this.sharedStateRegistry = null; } @Override @@ -123,12 +122,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { return checkpointId; } - public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() { - return createdSharedState; - } - - public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() { - return referencedSharedState; + public Map<StateHandleID, StreamStateHandle> getSharedState() { + return sharedState; } public Map<StateHandleID, StreamStateHandle> getPrivateState() { @@ -155,8 +150,6 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public void discardState() throws Exception { - Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first."); - try { metaStateHandle.discardState(); } catch (Exception e) { @@ -169,37 +162,35 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { LOG.warn("Could not properly discard misc file states.", e); } - try { - StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); + // If this was not registered, we can delete the shared state. We can simply apply this + // to all handles, because all handles that have not been created for the first time for this + // are only placeholders at this point (disposing them is a NOP). + if (sharedStateRegistry == null) { + try { + StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } + } else { + // If this was registered, we only unregister all our referenced shared states + // from the registry. + for (StateHandleID stateHandleID : sharedState.keySet()) { + sharedStateRegistry.unregisterReference( + createSharedStateRegistryKeyFromFileName(stateHandleID)); + } } - } @Override public long getStateSize() { - long size = getPrivateStateSize(); - - for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) { - size += oldSstFileHandle.getStateSize(); - } - - return size; - } - - /** - * Returns the size of the state that is privately owned by this handle. - */ - public long getPrivateStateSize() { long size = StateUtil.getStateSize(metaStateHandle); - for (StreamStateHandle newSstFileHandle : createdSharedState.values()) { - size += newSstFileHandle.getStateSize(); + for (StreamStateHandle sharedStateHandle : sharedState.values()) { + size += sharedStateHandle.getStateSize(); } - for (StreamStateHandle miscFileHandle : privateState.values()) { - size += miscFileHandle.getStateSize(); + for (StreamStateHandle privateStateHandle : privateState.values()) { + size += privateStateHandle.getStateSize(); } return size; @@ -208,64 +199,38 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { - Preconditions.checkState(!registered, "The state handle has already registered its shared states."); + Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states."); + + sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); - for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) { + for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); + createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); SharedStateRegistry.Result result = - stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); - - // We update our reference with the result from the registry, to prevent the following - // problem: + stateRegistry.registerReference(registryKey, sharedStateHandle.getValue()); + + // This step consolidates our shared handles with the registry, which does two things: + // + // 1) Replace placeholder state handle with already registered, actual state handles. + // + // 2) Deduplicate re-uploads of incremental state due to missing confirmations about + // completed checkpoints. + // + // This prevents the following problem: // A previous checkpoint n has already registered the state. This can happen if a // following checkpoint (n + x) wants to reference the same state before the backend got // notified that checkpoint n completed. In this case, the shared registry did // deduplication and returns the previous reference. - newSstFileEntry.setValue(result.getReference()); - } - - for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); - - SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - - // Again we update our state handle with the result from the registry, thus replacing - // placeholder state handles with the originals. - oldSstFileName.setValue(result.getReference()); - } - - // Migrate state from unregistered to registered, so that it will not count as private state - // for #discardState() from now. - referencedSharedState.putAll(createdSharedState); - createdSharedState.clear(); - - registered = true; - } - - @Override - public void unregisterSharedStates(SharedStateRegistry stateRegistry) { - - Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - - for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - stateRegistry.releaseReference(registryKey); - } - - for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey()); - stateRegistry.releaseReference(registryKey); + sharedStateHandle.setValue(result.getReference()); } - - registered = false; } - private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { + /** + * Create a unique key to register one of our shared state handles. + */ + @VisibleForTesting + public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId); } @@ -293,10 +258,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { return false; } - if (!getCreatedSharedState().equals(that.getCreatedSharedState())) { - return false; - } - if (!getReferencedSharedState().equals(that.getReferencedSharedState())) { + if (!getSharedState().equals(that.getSharedState())) { return false; } if (!getPrivateState().equals(that.getPrivateState())) { @@ -314,8 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle { int result = getOperatorIdentifier().hashCode(); result = 31 * result + getKeyGroupRange().hashCode(); result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); - result = 31 * result + getCreatedSharedState().hashCode(); - result = 31 * result + getReferencedSharedState().hashCode(); + result = 31 * result + getSharedState().hashCode(); result = 31 * result + getPrivateState().hashCode(); result = 31 * result + getMetaStateHandle().hashCode(); return result; http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 8280460..8e38ad4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -98,11 +98,6 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle } @Override - public void unregisterSharedStates(SharedStateRegistry stateRegistry) { - // No shared states - } - - @Override public void discardState() throws Exception { stateHandle.discardState(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java index 2136061..7c948a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -18,29 +18,20 @@ package org.apache.flink.runtime.state; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; /** * A placeholder state handle for shared state that will replaced by an original that was - * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in - * case of {@link ByteStreamStateHandle}. To be used in the referenced states of + * created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in + * case of {@link ByteStreamStateHandle}. This class is used in the referenced states of * {@link IncrementalKeyedStateHandle}. - * <p> - * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They - * should not be called from production code. This means this class is also not suited to serve as - * a key, e.g. in hash maps. */ public class PlaceholderStreamStateHandle implements StreamStateHandle { private static final long serialVersionUID = 1L; - /** We remember the size of the original file for which this is a placeholder */ - private final long originalSize; - - public PlaceholderStreamStateHandle(long originalSize) { - this.originalSize = originalSize; + public PlaceholderStreamStateHandle() { } @Override @@ -56,33 +47,6 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { @Override public long getStateSize() { - return originalSize; - } - - /** - * This method is should only be called in tests! This should never serve as key in a hash map. - */ - @VisibleForTesting - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o; - - return originalSize == that.originalSize; - } - - /** - * This method is should only be called in tests! This should never serve as key in a hash map. - */ - @VisibleForTesting - @Override - public int hashCode() { - return (int) (originalSize ^ (originalSize >>> 32)); + return 0L; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index f9161b0..a5e0f84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -54,7 +54,7 @@ public class SharedStateRegistry { } /** - * Register a reference to the given (supposedly new) shared state in the registry. + * Register a reference to the given shared state in the registry. * This does the following: We check if the state handle is actually new by the * registrationKey. If it is new, we register it with a reference count of 1. If there is * already a state handle registered under the given key, we dispose the given "new" state @@ -62,14 +62,14 @@ public class SharedStateRegistry { * a replacement with the result. * * <p>IMPORTANT: caller should check the state handle returned by the result, because the - * registry is performing deduplication and could potentially return a handle that is supposed + * registry is performing de-duplication and could potentially return a handle that is supposed * to replace the one from the registration request. * * @param state the shared state for which we register a reference. * @return the result of this registration request, consisting of the state handle that is * registered under the key by the end of the oepration and its current reference count. */ - public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) { + public Result registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) { Preconditions.checkNotNull(state); @@ -96,28 +96,6 @@ public class SharedStateRegistry { } /** - * Obtains one reference to the given shared state in the registry. This increases the - * reference count by one. - * - * @param registrationKey the shared state for which we obtain a reference. - * @return the shared state for which we release a reference. - * @return the result of the request, consisting of the reference count after this operation - * and the state handle. - */ - public Result obtainReference(SharedStateRegistryKey registrationKey) { - - Preconditions.checkNotNull(registrationKey); - - synchronized (registeredStates) { - SharedStateRegistry.SharedStateEntry entry = - Preconditions.checkNotNull(registeredStates.get(registrationKey), - "Could not find a state for the given registration key!"); - entry.increaseReferenceCount(); - return new Result(entry); - } - } - - /** * Releases one reference to the given shared state in the registry. This decreases the * reference count by one. Once the count reaches zero, the shared state is deleted. * @@ -125,7 +103,7 @@ public class SharedStateRegistry { * @return the result of the request, consisting of the reference count after this operation * and the state handle, or null if the state handle was deleted through this request. */ - public Result releaseReference(SharedStateRegistryKey registrationKey) { + public Result unregisterReference(SharedStateRegistryKey registrationKey) { Preconditions.checkNotNull(registrationKey); @@ -172,30 +150,18 @@ public class SharedStateRegistry { } } - /** - * Unregister all the shared states referenced by the given. - * - * @param stateHandles The shared states to unregister. - */ - public void unregisterAll(Iterable<? extends CompositeStateHandle> stateHandles) { - if (stateHandles == null) { - return; - } - - synchronized (registeredStates) { - for (CompositeStateHandle stateHandle : stateHandles) { - stateHandle.unregisterSharedStates(this); - } - } - } - private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { - if (streamStateHandle != null) { + // We do the small optimization to not issue discards for placeholders, which are NOPs. + if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) { asyncDisposalExecutor.execute( new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); } } + private boolean isPlaceholder(StreamStateHandle stateHandle) { + return stateHandle instanceof PlaceholderStreamStateHandle; + } + /** * An entry in the registry, tracking the handle and the corresponding reference count. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 42703f8..9ba9d35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -91,6 +91,13 @@ public class ByteStreamStateHandle implements StreamStateHandle { return 31 * handleName.hashCode(); } + @Override + public String toString() { + return "ByteStreamStateHandle{" + + "handleName='" + handleName + '\'' + + '}'; + } + /** * An input stream view on a byte array. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 9250634..3b44d9a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -639,12 +639,6 @@ public class CheckpointCoordinatorTest { assertEquals(checkpointIdNew, successNew.getCheckpointID()); assertTrue(successNew.getOperatorStates().isEmpty()); - // validate that the subtask states in old savepoint have unregister their shared states - { - verify(subtaskState1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - } - // validate that the relevant tasks got a confirmation message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); @@ -925,9 +919,6 @@ public class CheckpointCoordinatorTest { verify(subtaskState1_2, times(1)).discardState(); // validate that all subtask states in the second checkpoint are not discarded - verify(subtaskState2_1, never()).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2_2, never()).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2_3, never()).unregisterSharedStates(any(SharedStateRegistry.class)); verify(subtaskState2_1, never()).discardState(); verify(subtaskState2_2, never()).discardState(); verify(subtaskState2_3, never()).discardState(); @@ -951,9 +942,6 @@ public class CheckpointCoordinatorTest { coord.shutdown(JobStatus.FINISHED); // validate that the states in the second checkpoint have been discarded - verify(subtaskState2_1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2_2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2_3, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); verify(subtaskState2_1, times(1)).discardState(); verify(subtaskState2_2, times(1)).discardState(); verify(subtaskState2_3, times(1)).discardState(); @@ -1562,10 +1550,6 @@ public class CheckpointCoordinatorTest { verify(subtaskState1, never()).discardState(); verify(subtaskState2, never()).discardState(); - // Savepoints are not supposed to have any shared state. - verify(subtaskState1, never()).unregisterSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2, never()).unregisterSharedStates(any(SharedStateRegistry.class)); - // validate that the relevant tasks got a confirmation message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class)); @@ -2088,15 +2072,6 @@ public class CheckpointCoordinatorTest { // shutdown the store store.shutdown(JobStatus.SUSPENDED); - // All shared states should be unregistered once the store is shut down - for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { - for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) { - for (OperatorSubtaskState subtaskState : taskState.getStates()) { - verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - } - } - } - // restore the store Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 985c662..fb5d7c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import java.io.IOException; import java.util.Collection; @@ -37,11 +37,6 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** * Test for basic {@link CompletedCheckpointStore} contract. @@ -114,12 +109,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { expected[i - 1].awaitDiscard(); assertTrue(expected[i - 1].isDiscarded()); assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); - - for (OperatorState operatorState : taskStates) { - for (OperatorSubtaskState subtaskState : operatorState.getStates()) { - verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class)); - } - } } } @@ -209,7 +198,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { operatorGroupState.put(operatorID, operatorState); for (int i = 0; i < numberOfStates; i++) { - OperatorSubtaskState subtaskState = mock(OperatorSubtaskState.class); + OperatorSubtaskState subtaskState = + new TestOperatorSubtaskState(); operatorState.putState(i, subtaskState); } @@ -217,18 +207,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props); } - protected void resetCheckpoint(Collection<OperatorState> operatorStates) { - for (OperatorState operatorState : operatorStates) { - for (OperatorSubtaskState subtaskState : operatorState.getStates()) { - Mockito.reset(subtaskState); - } - } - } - protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates, SharedStateRegistry registry) { for (OperatorState operatorState : operatorStates) { for (OperatorSubtaskState subtaskState : operatorState.getStates()) { - verify(subtaskState, times(1)).registerSharedStates(eq(registry)); + Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).registered); } } } @@ -236,7 +218,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) { for (OperatorState operatorState : operatorStates) { for (OperatorSubtaskState subtaskState : operatorState.getStates()) { - verify(subtaskState, times(1)).discardState(); + Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded); } } } @@ -333,4 +315,37 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } } + static class TestOperatorSubtaskState extends OperatorSubtaskState { + private static final long serialVersionUID = 522580433699164230L; + + boolean registered; + boolean discarded; + + public TestOperatorSubtaskState() { + super(null, null, null, null, null); + this.registered = false; + this.discarded = false; + } + + @Override + public void discardState() { + super.discardState(); + Assert.assertFalse(discarded); + discarded = true; + registered = false; + } + + @Override + public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { + super.registerSharedStates(sharedStateRegistry); + Assert.assertFalse(discarded); + registered = true; + } + + public void reset() { + registered = false; + discarded = false; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 589ff46..0bbb961 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -100,7 +100,6 @@ public class CompletedCheckpointTest { checkpoint.discardOnSubsume(sharedStateRegistry); verify(state, times(1)).discardState(); - verify(state, times(1)).unregisterSharedStates(sharedStateRegistry); } /** @@ -138,7 +137,6 @@ public class CompletedCheckpointTest { checkpoint.discardOnShutdown(status, sharedStateRegistry); verify(state, times(0)).discardState(); assertEquals(true, file.exists()); - verify(state, times(0)).unregisterSharedStates(sharedStateRegistry); // Discard props = new CheckpointProperties(false, false, true, true, true, true, true); @@ -152,7 +150,6 @@ public class CompletedCheckpointTest { checkpoint.discardOnShutdown(status, sharedStateRegistry); verify(state, times(1)).discardState(); - verify(state, times(1)).unregisterSharedStates(sharedStateRegistry); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 6df01a0..a96b597 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -197,7 +197,6 @@ public class PendingCheckpointTest { OperatorState state = mock(OperatorState.class); doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class)); - doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class)); String targetDir = tmpFolder.newFolder().getAbsolutePath(); http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 0d93289..44c802b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @@ -100,11 +101,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); - resetCheckpoint(expected[0].getOperatorStates().values()); - resetCheckpoint(expected[1].getOperatorStates().values()); - resetCheckpoint(expected[2].getOperatorStates().values()); - - // Recover TODO!!! clear registry! + // Recover checkpoints.recover(); assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index b63782d..f985573 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; @@ -273,18 +272,17 @@ public class CheckpointTestUtils { private CheckpointTestUtils() {} - private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { + public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { return new IncrementalKeyedStateHandle( createRandomUUID(rnd).toString(), new KeyGroupRange(1, 1), 42L, - createRandomOwnedHandleMap(rnd), - createRandomReferencedHandleMap(rnd), - createRandomOwnedHandleMap(rnd), + createRandomStateHandleMap(rnd), + createRandomStateHandleMap(rnd), createDummyStreamStateHandle(rnd)); } - private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) { + public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) { final int size = rnd.nextInt(4); Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); for (int i = 0; i < size; ++i) { @@ -296,24 +294,13 @@ public class CheckpointTestUtils { return result; } - private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) { - final int size = rnd.nextInt(4); - Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); - for (int i = 0; i < size; ++i) { - StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString()); - result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024))); - } - - return result; - } - - private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) { + public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) { return new KeyGroupsStateHandle( new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}), createDummyStreamStateHandle(rnd)); } - private static StreamStateHandle createDummyStreamStateHandle(Random rnd) { + public static StreamStateHandle createDummyStreamStateHandle(Random rnd) { return new TestByteStreamStateHandleDeepCompare( String.valueOf(createRandomUUID(rnd)), String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));
