[FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared state in incremental checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4745d0c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4745d0c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4745d0c0 Branch: refs/heads/master Commit: 4745d0c0822ba1f1c32568d0c4869cb44fa35426 Parents: b54f448 Author: Stefan Richter <[email protected]> Authored: Wed May 10 17:59:39 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sun May 14 13:49:50 2017 +0200 ---------------------------------------------------------------------- .../RocksDBIncrementalKeyedStateHandle.java | 123 ++++++------ .../state/RocksDBKeyedStateBackend.java | 64 +++++- .../runtime/state/SharedStateRegistry.java | 196 ++++++++++++++----- .../runtime/state/SharedStateRegistryKey.java | 68 +++++++ .../runtime/state/SharedStateRegistryTest.java | 85 +++++--- .../runtime/state/StateBackendTestBase.java | 18 +- 6 files changed, 397 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java index 5ac9e46..961182d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java @@ -18,12 +18,11 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.SharedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryKey; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; @@ -54,17 +53,15 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com private static final long serialVersionUID = -8328808513197388231L; - private final JobID jobId; - private final String operatorIdentifier; private final KeyGroupRange keyGroupRange; private final long checkpointId; - private final Map<String, StreamStateHandle> newSstFiles; + private final Map<String, StreamStateHandle> unregisteredSstFiles; - private final Map<String, StreamStateHandle> oldSstFiles; + private final Map<String, StreamStateHandle> registeredSstFiles; private final Map<String, StreamStateHandle> miscFiles; @@ -81,21 +78,19 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com private boolean registered; RocksDBIncrementalKeyedStateHandle( - JobID jobId, String operatorIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<String, StreamStateHandle> newSstFiles, - Map<String, StreamStateHandle> oldSstFiles, + Map<String, StreamStateHandle> unregisteredSstFiles, + Map<String, StreamStateHandle> registeredSstFiles, Map<String, StreamStateHandle> miscFiles, StreamStateHandle metaStateHandle) { - this.jobId = Preconditions.checkNotNull(jobId); this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.checkpointId = checkpointId; - this.newSstFiles = Preconditions.checkNotNull(newSstFiles); - this.oldSstFiles = Preconditions.checkNotNull(oldSstFiles); + this.unregisteredSstFiles = Preconditions.checkNotNull(unregisteredSstFiles); + this.registeredSstFiles = Preconditions.checkNotNull(registeredSstFiles); this.miscFiles = Preconditions.checkNotNull(miscFiles); this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); this.registered = false; @@ -110,12 +105,12 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com return checkpointId; } - Map<String, StreamStateHandle> getNewSstFiles() { - return newSstFiles; + Map<String, StreamStateHandle> getUnregisteredSstFiles() { + return unregisteredSstFiles; } - Map<String, StreamStateHandle> getOldSstFiles() { - return oldSstFiles; + Map<String, StreamStateHandle> getRegisteredSstFiles() { + return registeredSstFiles; } Map<String, StreamStateHandle> getMiscFiles() { @@ -138,6 +133,8 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com @Override public void discardState() throws Exception { + Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first."); + try { metaStateHandle.discardState(); } catch (Exception e) { @@ -150,24 +147,23 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com LOG.warn("Could not properly discard misc file states.", e); } - if (!registered) { - try { - StateUtil.bestEffortDiscardAllStateObjects(newSstFiles.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); - } + try { + StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); } + } @Override public long getStateSize() { long size = StateUtil.getStateSize(metaStateHandle); - for (StreamStateHandle newSstFileHandle : newSstFiles.values()) { + for (StreamStateHandle newSstFileHandle : unregisteredSstFiles.values()) { size += newSstFileHandle.getStateSize(); } - for (StreamStateHandle oldSstFileHandle : oldSstFiles.values()) { + for (StreamStateHandle oldSstFileHandle : registeredSstFiles.values()) { size += oldSstFileHandle.getStateSize(); } @@ -180,69 +176,66 @@ public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, Com @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount > 1); + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); } + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + registeredSstFiles.putAll(unregisteredSstFiles); + unregisteredSstFiles.clear(); + registered = true; } @Override public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); + stateRegistry.releaseReference(registryKey); } - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue())); + for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey()); + stateRegistry.releaseReference(registryKey); } registered = false; } - private class SstFileStateHandle implements SharedStateHandle { - - private static final long serialVersionUID = 9092049285789170669L; - - private final String fileName; - - private final StreamStateHandle delegateStateHandle; - - private SstFileStateHandle( - String fileName, - StreamStateHandle delegateStateHandle) { - this.fileName = fileName; - this.delegateStateHandle = delegateStateHandle; - } - - @Override - public String getRegistrationKey() { - return jobId + "-" + operatorIdentifier + "-" + keyGroupRange + "-" + fileName; - } - - @Override - public void discardState() throws Exception { - delegateStateHandle.discardState(); - } - - @Override - public long getStateSize() { - return delegateStateHandle.getStateSize(); - } + private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(String fileName) { + return new SharedStateRegistryKey(operatorIdentifier + "-" + keyGroupRange + "-" + fileName); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 6af53c3..1080e59 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -54,7 +54,6 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -62,9 +61,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -72,6 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; @@ -709,16 +710,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final class RocksDBIncrementalSnapshotOperation { + /** The backend which we snapshot */ private final RocksDBKeyedStateBackend<?> stateBackend; + /** Stream factory that creates the outpus streams to DFS */ private final CheckpointStreamFactory checkpointStreamFactory; + /** Id for the current checkpoint */ private final long checkpointId; + /** Timestamp for the current checkpoint */ private final long checkpointTimestamp; + /** All sst files that were part of the last previously completed checkpoint */ private Map<String, StreamStateHandle> baseSstFiles; + /** The state meta data */ private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); private FileSystem backupFileSystem; @@ -864,10 +871,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (fileHandle == null) { fileHandle = materializeStateData(filePath); - newSstFiles.put(fileName, fileHandle); } else { - oldSstFiles.put(fileName, fileHandle); + // we introduce a placeholder state handle, that is replaced with the + // original from the shared state registry (created from a previous checkpoint) + oldSstFiles.put(fileName, new PlaceholderStreamStateHandle(fileHandle.getStateSize())); } } else { StreamStateHandle fileHandle = materializeStateData(filePath); @@ -882,9 +890,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateBackend.materializedSstFiles.put(checkpointId, sstFiles); - return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, - stateBackend.operatorIdentifier, stateBackend.keyGroupRange, - checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); + return new RocksDBIncrementalKeyedStateHandle( + stateBackend.operatorIdentifier, + stateBackend.keyGroupRange, + checkpointId, + newSstFiles, + oldSstFiles, + miscFiles, + metaStateHandle); } void stop() { @@ -922,6 +935,39 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } + + /** + * A placeholder state handle for shared state that will replaced by an original that was + * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in + * case of {@link ByteStreamStateHandle}. + */ + private static final class PlaceholderStreamStateHandle implements StreamStateHandle { + + private static final long serialVersionUID = 1L; + + /** We remember the size of the original file for which this is a placeholder */ + private final long originalSize; + + public PlaceholderStreamStateHandle(long originalSize) { + this.originalSize = originalSize; + } + + @Override + public FSDataInputStream openInputStream() { + throw new UnsupportedOperationException( + "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); + } + + @Override + public void discardState() throws Exception { + // nothing to do. + } + + @Override + public long getStateSize() { + return originalSize; + } + } } @Override @@ -1260,7 +1306,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { UUID.randomUUID().toString()); try { - Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getNewSstFiles(); + Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getUnregisteredSstFiles(); for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { String fileName = newSstFileEntry.getKey(); StreamStateHandle remoteFileHandle = newSstFileEntry.getValue(); @@ -1268,7 +1314,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); } - Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getOldSstFiles(); + Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getRegisteredSstFiles(); for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { String fileName = oldSstFileEntry.getKey(); StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 2cb43ac..dbf4642 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * maintain the reference count of {@link SharedStateHandle}s which are shared - * among different checkpoints. - * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ - private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates; + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 } /** - * Register a reference to the given shared state in the registry. This increases the reference - * count for the this shared state by one. Returns the reference count after the update. + * Register a reference to the given (supposedly new) shared state in the registry. + * This does the following: We check if the state handle is actually new by the + * registrationKey. If it is new, we register it with a reference count of 1. If there is + * already a state handle registered under the given key, we dispose the given "new" state + * handle, uptick the reference count of the previously existing state handle and return it as + * a replacement with the result. + * + * <p>IMPORTANT: caller should check the state handle returned by the result, because the + * registry is performing deduplication and could potentially return a handle that is supposed + * to replace the one from the registration request. * * @param state the shared state for which we register a reference. - * @return the updated reference count for the given shared state. + * @return the result of this registration request, consisting of the state handle that is + * registered under the key by the end of the oepration and its current reference count. */ - public int register(SharedStateHandle state) { - if (state == null) { - return 0; - } + public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) { + + Preconditions.checkNotNull(state); + + StreamStateHandle scheduledStateDeletion = null; + SharedStateRegistry.SharedStateEntry entry; synchronized (registeredStates) { - SharedStateRegistry.SharedStateEntry entry = - registeredStates.get(state.getRegistrationKey()); + entry = registeredStates.get(registrationKey); if (entry == null) { - SharedStateRegistry.SharedStateEntry stateEntry = - new SharedStateRegistry.SharedStateEntry(state); - registeredStates.put(state.getRegistrationKey(), stateEntry); - return 1; + entry = new SharedStateRegistry.SharedStateEntry(state); + registeredStates.put(registrationKey, entry); } else { + // delete if this is a real duplicate + if (!Objects.equals(state, entry.state)) { + scheduledStateDeletion = state; + } entry.increaseReferenceCount(); - return entry.getReferenceCount(); } } + + scheduleAsyncDelete(scheduledStateDeletion); + return new Result(entry); } /** - * Unregister one reference to the given shared state in the registry. This decreases the - * reference count by one. Once the count reaches zero, the shared state is deleted. + * Obtains one reference to the given shared state in the registry. This increases the + * reference count by one. * - * @param state the shared state for which we unregister a reference. - * @return the reference count for the shared state after the update. + * @param registrationKey the shared state for which we obtain a reference. + * @return the shared state for which we release a reference. + * @return the result of the request, consisting of the reference count after this operation + * and the state handle. */ - public int unregister(SharedStateHandle state) { - if (state == null) { - return 0; + public Result obtainReference(SharedStateRegistryKey registrationKey) { + + Preconditions.checkNotNull(registrationKey); + + synchronized (registeredStates) { + SharedStateRegistry.SharedStateEntry entry = + Preconditions.checkNotNull(registeredStates.get(registrationKey), + "Could not find a state for the given registration key!"); + entry.increaseReferenceCount(); + return new Result(entry); } + } + + /** + * Releases one reference to the given shared state in the registry. This decreases the + * reference count by one. Once the count reaches zero, the shared state is deleted. + * + * @param registrationKey the shared state for which we release a reference. + * @return the result of the request, consisting of the reference count after this operation + * and the state handle, or null if the state handle was deleted through this request. + */ + public Result releaseReference(SharedStateRegistryKey registrationKey) { + + Preconditions.checkNotNull(registrationKey); + + final Result result; + final StreamStateHandle scheduledStateDeletion; synchronized (registeredStates) { - SharedStateRegistry.SharedStateEntry entry = registeredStates.get(state.getRegistrationKey()); + SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey); - Preconditions.checkState(entry != null, "Cannot unregister a state that is not registered."); + Preconditions.checkState(entry != null, + "Cannot unregister a state that is not registered."); entry.decreaseReferenceCount(); - final int newReferenceCount = entry.getReferenceCount(); - // Remove the state from the registry when it's not referenced any more. - if (newReferenceCount <= 0) { - registeredStates.remove(state.getRegistrationKey()); - try { - entry.getState().discardState(); - } catch (Exception e) { - LOG.warn("Cannot properly discard the state {}.", entry.getState(), e); - } + if (entry.getReferenceCount() <= 0) { + registeredStates.remove(registrationKey); + scheduledStateDeletion = entry.getState(); + result = new Result(null, 0); + } else { + scheduledStateDeletion = null; + result = new Result(entry); } - return newReferenceCount; } + + scheduleAsyncDelete(scheduledStateDeletion); + return result; } /** @@ -122,8 +168,6 @@ public class SharedStateRegistry { } } - - /** * Unregister all the shared states referenced by the given. * @@ -141,20 +185,30 @@ public class SharedStateRegistry { } } + private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) { + if (streamStateHandle != null) { + asyncDisposalExecutor.execute( + new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle)); + } + } + + /** + * An entry in the registry, tracking the handle and the corresponding reference count. + */ private static class SharedStateEntry { - /** The shared object */ - private final SharedStateHandle state; + /** The shared state handle */ + private final StreamStateHandle state; - /** The reference count of the object */ + /** The current reference count of the state handle */ private int referenceCount; - SharedStateEntry(SharedStateHandle value) { + SharedStateEntry(StreamStateHandle value) { this.state = value; this.referenceCount = 1; } - SharedStateHandle getState() { + StreamStateHandle getState() { return state; } @@ -171,14 +225,56 @@ public class SharedStateRegistry { } } - public int getReferenceCount(SharedStateHandle state) { - if (state == null) { - return 0; + /** + * The result of an attempt to (un)/reference state + */ + public static class Result { + + /** The (un)registered state handle from the request */ + private final StreamStateHandle reference; + + /** The reference count to the state handle after the request to (un)register */ + private final int referenceCount; + + private Result(SharedStateEntry sharedStateEntry) { + this.reference = sharedStateEntry.getState(); + this.referenceCount = sharedStateEntry.getReferenceCount(); } - SharedStateRegistry.SharedStateEntry entry = - registeredStates.get(state.getRegistrationKey()); + public Result(StreamStateHandle reference, int referenceCount) { + Preconditions.checkArgument(referenceCount >= 0); - return entry == null ? 0 : entry.getReferenceCount(); + this.reference = reference; + this.referenceCount = referenceCount; + } + + public StreamStateHandle getReference() { + return reference; + } + + public int getReferenceCount() { + return referenceCount; + } + } + + /** + * Encapsulates the operation the delete state handles asynchronously. + */ + private static final class AsyncDisposalRunnable implements Runnable { + + private final StateObject toDispose; + + public AsyncDisposalRunnable(StateObject toDispose) { + this.toDispose = Preconditions.checkNotNull(toDispose); + } + + @Override + public void run() { + try { + toDispose.discardState(); + } catch (Exception e) { + LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", toDispose, e); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java new file mode 100644 index 0000000..9e59359 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class represents a key that uniquely identifies (on a logical level) state handles for + * registration in the {@link SharedStateRegistry}. Two files which should logically + * be the same should have the same {@link SharedStateRegistryKey}. The meaning of logical + * equivalence is up to the application. + */ +public class SharedStateRegistryKey implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Uses a String as internal representation */ + private final String keyString; + + public SharedStateRegistryKey(String keyString) { + this.keyString = Preconditions.checkNotNull(keyString); + } + + public String getKeyString() { + return keyString; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SharedStateRegistryKey that = (SharedStateRegistryKey) o; + return keyString.equals(that.keyString); + } + + @Override + public int hashCode() { + return keyString.hashCode(); + } + + @Override + public String toString() { + return keyString; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index 821bb69..03e2a13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -19,9 +19,14 @@ package org.apache.flink.runtime.state; +import org.apache.flink.core.fs.FSDataInputStream; import org.junit.Test; +import java.io.IOException; + +import static junit.framework.TestCase.assertFalse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class SharedStateRegistryTest { @@ -30,24 +35,50 @@ public class SharedStateRegistryTest { */ @Test public void testRegistryNormal() { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); // register one state TestSharedState firstState = new TestSharedState("first"); - assertEquals(1, sharedStateRegistry.register(firstState)); + SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstState); + assertEquals(1, result.getReferenceCount()); + assertTrue(firstState == result.getReference()); + assertFalse(firstState.isDiscarded()); // register another state TestSharedState secondState = new TestSharedState("second"); - assertEquals(1, sharedStateRegistry.register(secondState)); - - // register the first state again - assertEquals(2, sharedStateRegistry.register(firstState)); + result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState); + assertEquals(1, result.getReferenceCount()); + assertTrue(secondState == result.getReference()); + assertFalse(firstState.isDiscarded()); + assertFalse(secondState.isDiscarded()); + + // attempt to register state under an existing key + TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString()); + result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime); + assertEquals(2, result.getReferenceCount()); + assertFalse(firstStatePrime == result.getReference()); + assertTrue(firstState == result.getReference()); + assertTrue(firstStatePrime.isDiscarded()); + assertFalse(firstState.isDiscarded()); + + // reference the first state again + result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey()); + assertEquals(3, result.getReferenceCount()); + assertTrue(firstState == result.getReference()); + assertFalse(firstState.isDiscarded()); // unregister the second state - assertEquals(0, sharedStateRegistry.unregister(secondState)); + result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey()); + assertEquals(0, result.getReferenceCount()); + assertTrue(result.getReference() == null); + assertTrue(secondState.isDiscarded()); // unregister the first state - assertEquals(1, sharedStateRegistry.unregister(firstState)); + result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey()); + assertEquals(2, result.getReferenceCount()); + assertTrue(firstState == result.getReference()); + assertFalse(firstState.isDiscarded()); } /** @@ -56,51 +87,47 @@ public class SharedStateRegistryTest { @Test(expected = IllegalStateException.class) public void testUnregisterWithUnexistedKey() { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - - sharedStateRegistry.unregister(new TestSharedState("unexisted")); + sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent")); } - private static class TestSharedState implements SharedStateHandle { + private static class TestSharedState implements StreamStateHandle { private static final long serialVersionUID = 4468635881465159780L; - private String key; + private SharedStateRegistryKey key; + + private boolean discarded; TestSharedState(String key) { - this.key = key; + this.key = new SharedStateRegistryKey(key); + this.discarded = false; } - @Override - public String getRegistrationKey() { + public SharedStateRegistryKey getRegistrationKey() { return key; } @Override public void discardState() throws Exception { - // nothing to do + this.discarded = true; } @Override public long getStateSize() { - return key.length(); + return key.toString().length(); } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - TestSharedState testState = (TestSharedState) o; - - return key.equals(testState.key); + public int hashCode() { + return key.hashCode(); } @Override - public int hashCode() { - return key.hashCode(); + public FSDataInputStream openInputStream() throws IOException { + throw new UnsupportedOperationException(); + } + + public boolean isDiscarded() { + return discarded; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4745d0c0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 658ccde..ca66ffb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -482,6 +482,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); Environment env = new DummyEnvironment("test", 1, 0); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); @@ -509,6 +510,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory, CheckpointOptions.forFullCheckpoint())); + snapshot.registerSharedStates(sharedStateRegistry); backend.dispose(); // ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ========== @@ -518,8 +520,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); - snapshot.discardState(); - // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise // initializeSerializerUnlessSet would not pick up our new config kvId = new ValueStateDescriptor<>("id", pojoType); @@ -536,6 +536,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory, CheckpointOptions.forFullCheckpoint())); + snapshot2.registerSharedStates(sharedStateRegistry); + + snapshot.unregisterSharedStates(sharedStateRegistry); + snapshot.discardState(); + backend.dispose(); // ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) ========= @@ -570,6 +575,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); Environment env = new DummyEnvironment("test", 1, 0); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); @@ -597,6 +603,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory, CheckpointOptions.forFullCheckpoint())); + snapshot.registerSharedStates(sharedStateRegistry); backend.dispose(); // ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ========== @@ -605,8 +612,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); - snapshot.discardState(); - // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise // initializeSerializerUnlessSet would not pick up our new config kvId = new ValueStateDescriptor<>("id", pojoType); @@ -623,6 +628,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory, CheckpointOptions.forFullCheckpoint())); + snapshot2.registerSharedStates(sharedStateRegistry); + + snapshot.unregisterSharedStates(sharedStateRegistry); + snapshot.discardState(); + backend.dispose(); // ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
