[FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared 
state in incremental checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b30b8eef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b30b8eef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b30b8eef

Branch: refs/heads/release-1.3
Commit: b30b8eef044bd2af27485cd7c09b9b48dadd9dd8
Parents: 5e61a01
Author: Stefan Richter <[email protected]>
Authored: Wed May 10 17:59:39 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../RocksDBIncrementalKeyedStateHandle.java     | 123 ++++++------
 .../state/RocksDBKeyedStateBackend.java         |  64 +++++-
 .../runtime/state/SharedStateRegistry.java      | 196 ++++++++++++++-----
 .../runtime/state/SharedStateRegistryKey.java   |  68 +++++++
 .../runtime/state/SharedStateRegistryTest.java  |  85 +++++---
 .../runtime/state/StateBackendTestBase.java     |  18 +-
 6 files changed, 397 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
index 5ac9e46..961182d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.SharedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
@@ -54,17 +53,15 @@ public class RocksDBIncrementalKeyedStateHandle implements 
KeyedStateHandle, Com
 
        private static final long serialVersionUID = -8328808513197388231L;
 
-       private final JobID jobId;
-
        private final String operatorIdentifier;
 
        private final KeyGroupRange keyGroupRange;
 
        private final long checkpointId;
 
-       private final Map<String, StreamStateHandle> newSstFiles;
+       private final Map<String, StreamStateHandle> unregisteredSstFiles;
 
-       private final Map<String, StreamStateHandle> oldSstFiles;
+       private final Map<String, StreamStateHandle> registeredSstFiles;
 
        private final Map<String, StreamStateHandle> miscFiles;
 
@@ -81,21 +78,19 @@ public class RocksDBIncrementalKeyedStateHandle implements 
KeyedStateHandle, Com
        private boolean registered;
 
        RocksDBIncrementalKeyedStateHandle(
-                       JobID jobId,
                        String operatorIdentifier,
                        KeyGroupRange keyGroupRange,
                        long checkpointId,
-                       Map<String, StreamStateHandle> newSstFiles,
-                       Map<String, StreamStateHandle> oldSstFiles,
+                       Map<String, StreamStateHandle> unregisteredSstFiles,
+                       Map<String, StreamStateHandle> registeredSstFiles,
                        Map<String, StreamStateHandle> miscFiles,
                        StreamStateHandle metaStateHandle) {
 
-               this.jobId = Preconditions.checkNotNull(jobId);
                this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
                this.checkpointId = checkpointId;
-               this.newSstFiles = Preconditions.checkNotNull(newSstFiles);
-               this.oldSstFiles = Preconditions.checkNotNull(oldSstFiles);
+               this.unregisteredSstFiles = 
Preconditions.checkNotNull(unregisteredSstFiles);
+               this.registeredSstFiles = 
Preconditions.checkNotNull(registeredSstFiles);
                this.miscFiles = Preconditions.checkNotNull(miscFiles);
                this.metaStateHandle = 
Preconditions.checkNotNull(metaStateHandle);
                this.registered = false;
@@ -110,12 +105,12 @@ public class RocksDBIncrementalKeyedStateHandle 
implements KeyedStateHandle, Com
                return checkpointId;
        }
 
-       Map<String, StreamStateHandle> getNewSstFiles() {
-               return newSstFiles;
+       Map<String, StreamStateHandle> getUnregisteredSstFiles() {
+               return unregisteredSstFiles;
        }
 
-       Map<String, StreamStateHandle> getOldSstFiles() {
-               return oldSstFiles;
+       Map<String, StreamStateHandle> getRegisteredSstFiles() {
+               return registeredSstFiles;
        }
 
        Map<String, StreamStateHandle> getMiscFiles() {
@@ -138,6 +133,8 @@ public class RocksDBIncrementalKeyedStateHandle implements 
KeyedStateHandle, Com
        @Override
        public void discardState() throws Exception {
 
+               Preconditions.checkState(!registered, "Attempt to dispose a 
registered composite state with registered shared state. Must unregister 
first.");
+
                try {
                        metaStateHandle.discardState();
                } catch (Exception e) {
@@ -150,24 +147,23 @@ public class RocksDBIncrementalKeyedStateHandle 
implements KeyedStateHandle, Com
                        LOG.warn("Could not properly discard misc file 
states.", e);
                }
 
-               if (!registered) {
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(newSstFiles.values());
-                       } catch (Exception e) {
-                               LOG.warn("Could not properly discard new sst 
file states.", e);
-                       }
+               try {
+                       
StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values());
+               } catch (Exception e) {
+                       LOG.warn("Could not properly discard new sst file 
states.", e);
                }
+
        }
 
        @Override
        public long getStateSize() {
                long size = StateUtil.getStateSize(metaStateHandle);
 
-               for (StreamStateHandle newSstFileHandle : newSstFiles.values()) 
{
+               for (StreamStateHandle newSstFileHandle : 
unregisteredSstFiles.values()) {
                        size += newSstFileHandle.getStateSize();
                }
 
-               for (StreamStateHandle oldSstFileHandle : oldSstFiles.values()) 
{
+               for (StreamStateHandle oldSstFileHandle : 
registeredSstFiles.values()) {
                        size += oldSstFileHandle.getStateSize();
                }
 
@@ -180,69 +176,66 @@ public class RocksDBIncrementalKeyedStateHandle 
implements KeyedStateHandle, Com
 
        @Override
        public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
                Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-                       SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-                       int referenceCount = 
stateRegistry.register(stateHandle);
-                       Preconditions.checkState(referenceCount == 1);
+                       SharedStateRegistry.Result result =
+                               stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+                       // We update our reference with the result from the 
registry, to prevent the following
+                       // problem:
+                       // A previous checkpoint n has already registered the 
state. This can happen if a
+                       // following checkpoint (n + x) wants to reference the 
same state before the backend got
+                       // notified that checkpoint n completed. In this case, 
the shared registry did
+                       // deduplication and returns the previous reference.
+                       newSstFileEntry.setValue(result.getReference());
                }
 
-               for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
oldSstFiles.entrySet()) {
-                       SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+               for (Map.Entry<String, StreamStateHandle> oldSstFileName : 
registeredSstFiles.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+                       SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
 
-                       int referenceCount = 
stateRegistry.register(stateHandle);
-                       Preconditions.checkState(referenceCount > 1);
+                       // Again we update our state handle with the result 
from the registry, thus replacing
+                       // placeholder state handles with the originals.
+                       oldSstFileName.setValue(result.getReference());
                }
 
+               // Migrate state from unregistered to registered, so that it 
will not count as private state
+               // for #discardState() from now.
+               registeredSstFiles.putAll(unregisteredSstFiles);
+               unregisteredSstFiles.clear();
+
                registered = true;
        }
 
        @Override
        public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
                Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
 
-               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
-                       stateRegistry.unregister(new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+               for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+                       stateRegistry.releaseReference(registryKey);
                }
 
-               for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
oldSstFiles.entrySet()) {
-                       stateRegistry.unregister(new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()));
+               for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
registeredSstFiles.entrySet()) {
+                       SharedStateRegistryKey registryKey =
+                               
createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
+                       stateRegistry.releaseReference(registryKey);
                }
 
                registered = false;
        }
 
-       private class SstFileStateHandle implements SharedStateHandle {
-
-               private static final long serialVersionUID = 
9092049285789170669L;
-
-               private final String fileName;
-
-               private final StreamStateHandle delegateStateHandle;
-
-               private SstFileStateHandle(
-                               String fileName,
-                               StreamStateHandle delegateStateHandle) {
-                       this.fileName = fileName;
-                       this.delegateStateHandle = delegateStateHandle;
-               }
-
-               @Override
-               public String getRegistrationKey() {
-                       return jobId + "-" + operatorIdentifier + "-" + 
keyGroupRange + "-" + fileName;
-               }
-
-               @Override
-               public void discardState() throws Exception {
-                       delegateStateHandle.discardState();
-               }
-
-               @Override
-               public long getStateSize() {
-                       return delegateStateHandle.getStateSize();
-               }
+       private SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(String fileName) {
+               return new SharedStateRegistryKey(operatorIdentifier + "-" + 
keyGroupRange + "-" + fileName);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6af53c3..1080e59 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -54,7 +54,6 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -62,9 +61,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -72,6 +72,7 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
@@ -709,16 +710,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        private static final class RocksDBIncrementalSnapshotOperation {
 
+               /** The backend which we snapshot */
                private final RocksDBKeyedStateBackend<?> stateBackend;
 
+               /** Stream factory that creates the outpus streams to DFS */
                private final CheckpointStreamFactory checkpointStreamFactory;
 
+               /** Id for the current checkpoint */
                private final long checkpointId;
 
+               /** Timestamp for the current checkpoint */
                private final long checkpointTimestamp;
 
+               /** All sst files that were part of the last previously 
completed checkpoint */
                private Map<String, StreamStateHandle> baseSstFiles;
 
+               /** The state meta data */
                private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
 
                private FileSystem backupFileSystem;
@@ -864,10 +871,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                                if (fileHandle == null) {
                                                        fileHandle = 
materializeStateData(filePath);
-
                                                        
newSstFiles.put(fileName, fileHandle);
                                                } else {
-                                                       
oldSstFiles.put(fileName, fileHandle);
+                                                       // we introduce a 
placeholder state handle, that is replaced with the
+                                                       // original from the 
shared state registry (created from a previous checkpoint)
+                                                       
oldSstFiles.put(fileName, new 
PlaceholderStreamStateHandle(fileHandle.getStateSize()));
                                                }
                                        } else {
                                                StreamStateHandle fileHandle = 
materializeStateData(filePath);
@@ -882,9 +890,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        stateBackend.materializedSstFiles.put(checkpointId, 
sstFiles);
 
-                       return new 
RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
-                               stateBackend.operatorIdentifier, 
stateBackend.keyGroupRange,
-                               checkpointId, newSstFiles, oldSstFiles, 
miscFiles, metaStateHandle);
+                       return new RocksDBIncrementalKeyedStateHandle(
+                               stateBackend.operatorIdentifier,
+                               stateBackend.keyGroupRange,
+                               checkpointId,
+                               newSstFiles,
+                               oldSstFiles,
+                               miscFiles,
+                               metaStateHandle);
                }
 
                void stop() {
@@ -922,6 +935,39 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        }
                }
+
+               /**
+                * A placeholder state handle for shared state that will 
replaced by an original that was
+                * created in a previous checkpoint. So we don't have to send 
the handle twice, e.g. in
+                * case of {@link ByteStreamStateHandle}.
+                */
+               private static final class PlaceholderStreamStateHandle 
implements StreamStateHandle {
+
+                       private static final long serialVersionUID = 1L;
+
+                       /** We remember the size of the original file for which 
this is a placeholder */
+                       private final long originalSize;
+
+                       public PlaceholderStreamStateHandle(long originalSize) {
+                               this.originalSize = originalSize;
+                       }
+
+                       @Override
+                       public FSDataInputStream openInputStream() {
+                               throw new UnsupportedOperationException(
+                                       "This is only a placeholder to be 
replaced by a real StreamStateHandle in the checkpoint coordinator.");
+                       }
+
+                       @Override
+                       public void discardState() throws Exception {
+                               // nothing to do.
+                       }
+
+                       @Override
+                       public long getStateSize() {
+                               return originalSize;
+                       }
+               }
        }
 
        @Override
@@ -1260,7 +1306,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                UUID.randomUUID().toString());
 
                        try {
-                               Map<String, StreamStateHandle> newSstFiles = 
restoreStateHandle.getNewSstFiles();
+                               Map<String, StreamStateHandle> newSstFiles = 
restoreStateHandle.getUnregisteredSstFiles();
                                for (Map.Entry<String, StreamStateHandle> 
newSstFileEntry : newSstFiles.entrySet()) {
                                        String fileName = 
newSstFileEntry.getKey();
                                        StreamStateHandle remoteFileHandle = 
newSstFileEntry.getValue();
@@ -1268,7 +1314,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
                                }
 
-                               Map<String, StreamStateHandle> oldSstFiles = 
restoreStateHandle.getOldSstFiles();
+                               Map<String, StreamStateHandle> oldSstFiles = 
restoreStateHandle.getRegisteredSstFiles();
                                for (Map.Entry<String, StreamStateHandle> 
oldSstFileEntry : oldSstFiles.entrySet()) {
                                        String fileName = 
oldSstFileEntry.getKey();
                                        StreamStateHandle remoteFileHandle = 
oldSstFileEntry.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 2cb43ac..dbf4642 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -18,91 +18,137 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
 
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
  * maintain the reference count of {@link SharedStateHandle}s which are shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
  */
 public class SharedStateRegistry {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
        /** All registered state objects by an artificial key */
-       private final Map<String, SharedStateRegistry.SharedStateEntry> 
registeredStates;
+       private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
+
+       /** Executor for async state deletion */
+       private final Executor asyncDisposalExecutor;
 
        public SharedStateRegistry() {
                this.registeredStates = new HashMap<>();
+               this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
        }
 
        /**
-        * Register a reference to the given shared state in the registry. This 
increases the reference
-        * count for the this shared state by one. Returns the reference count 
after the update.
+        * Register a reference to the given (supposedly new) shared state in 
the registry.
+        * This does the following: We check if the state handle is actually 
new by the
+        * registrationKey. If it is new, we register it with a reference count 
of 1. If there is
+        * already a state handle registered under the given key, we dispose 
the given "new" state
+        * handle, uptick the reference count of the previously existing state 
handle and return it as
+        * a replacement with the result.
+        *
+        * <p>IMPORTANT: caller should check the state handle returned by the 
result, because the
+        * registry is performing deduplication and could potentially return a 
handle that is supposed
+        * to replace the one from the registration request.
         *
         * @param state the shared state for which we register a reference.
-        * @return the updated reference count for the given shared state.
+        * @return the result of this registration request, consisting of the 
state handle that is
+        * registered under the key by the end of the oepration and its current 
reference count.
         */
-       public int register(SharedStateHandle state) {
-               if (state == null) {
-                       return 0;
-               }
+       public Result registerNewReference(SharedStateRegistryKey 
registrationKey, StreamStateHandle state) {
+
+               Preconditions.checkNotNull(state);
+
+               StreamStateHandle scheduledStateDeletion = null;
+               SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
-                       SharedStateRegistry.SharedStateEntry entry =
-                               
registeredStates.get(state.getRegistrationKey());
+                       entry = registeredStates.get(registrationKey);
 
                        if (entry == null) {
-                               SharedStateRegistry.SharedStateEntry stateEntry 
=
-                                       new 
SharedStateRegistry.SharedStateEntry(state);
-                               
registeredStates.put(state.getRegistrationKey(), stateEntry);
-                               return 1;
+                               entry = new 
SharedStateRegistry.SharedStateEntry(state);
+                               registeredStates.put(registrationKey, entry);
                        } else {
+                               // delete if this is a real duplicate
+                               if (!Objects.equals(state, entry.state)) {
+                                       scheduledStateDeletion = state;
+                               }
                                entry.increaseReferenceCount();
-                               return entry.getReferenceCount();
                        }
                }
+
+               scheduleAsyncDelete(scheduledStateDeletion);
+               return new Result(entry);
        }
 
        /**
-        * Unregister one reference to the given shared state in the registry. 
This decreases the
-        * reference count by one. Once the count reaches zero, the shared 
state is deleted.
+        * Obtains one reference to the given shared state in the registry. 
This increases the
+        * reference count by one.
         *
-        * @param state the shared state for which we unregister a reference.
-        * @return the reference count for the shared state after the update.
+        * @param registrationKey the shared state for which we obtain a 
reference.
+        * @return the shared state for which we release a reference.
+        * @return the result of the request, consisting of the reference count 
after this operation
+        * and the state handle.
         */
-       public int unregister(SharedStateHandle state) {
-               if (state == null) {
-                       return 0;
+       public Result obtainReference(SharedStateRegistryKey registrationKey) {
+
+               Preconditions.checkNotNull(registrationKey);
+
+               synchronized (registeredStates) {
+                       SharedStateRegistry.SharedStateEntry entry =
+                               
Preconditions.checkNotNull(registeredStates.get(registrationKey),
+                                       "Could not find a state for the given 
registration key!");
+                       entry.increaseReferenceCount();
+                       return new Result(entry);
                }
+       }
+
+       /**
+        * Releases one reference to the given shared state in the registry. 
This decreases the
+        * reference count by one. Once the count reaches zero, the shared 
state is deleted.
+        *
+        * @param registrationKey the shared state for which we release a 
reference.
+        * @return the result of the request, consisting of the reference count 
after this operation
+        * and the state handle, or null if the state handle was deleted 
through this request.
+        */
+       public Result releaseReference(SharedStateRegistryKey registrationKey) {
+
+               Preconditions.checkNotNull(registrationKey);
+
+               final Result result;
+               final StreamStateHandle scheduledStateDeletion;
 
                synchronized (registeredStates) {
-                       SharedStateRegistry.SharedStateEntry entry = 
registeredStates.get(state.getRegistrationKey());
+                       SharedStateRegistry.SharedStateEntry entry = 
registeredStates.get(registrationKey);
 
-                       Preconditions.checkState(entry != null, "Cannot 
unregister a state that is not registered.");
+                       Preconditions.checkState(entry != null,
+                               "Cannot unregister a state that is not 
registered.");
 
                        entry.decreaseReferenceCount();
 
-                       final int newReferenceCount = entry.getReferenceCount();
-
                        // Remove the state from the registry when it's not 
referenced any more.
-                       if (newReferenceCount <= 0) {
-                               
registeredStates.remove(state.getRegistrationKey());
-                               try {
-                                       entry.getState().discardState();
-                               } catch (Exception e) {
-                                       LOG.warn("Cannot properly discard the 
state {}.", entry.getState(), e);
-                               }
+                       if (entry.getReferenceCount() <= 0) {
+                               registeredStates.remove(registrationKey);
+                               scheduledStateDeletion = entry.getState();
+                               result = new Result(null, 0);
+                       } else {
+                               scheduledStateDeletion = null;
+                               result = new Result(entry);
                        }
-                       return newReferenceCount;
                }
+
+               scheduleAsyncDelete(scheduledStateDeletion);
+               return result;
        }
 
        /**
@@ -122,8 +168,6 @@ public class SharedStateRegistry {
                }
        }
 
-
-
        /**
         * Unregister all the shared states referenced by the given.
         *
@@ -141,20 +185,30 @@ public class SharedStateRegistry {
                }
        }
 
+       private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
+               if (streamStateHandle != null) {
+                       asyncDisposalExecutor.execute(
+                               new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
+               }
+       }
+
+       /**
+        * An entry in the registry, tracking the handle and the corresponding 
reference count.
+        */
        private static class SharedStateEntry {
 
-               /** The shared object */
-               private final SharedStateHandle state;
+               /** The shared state handle */
+               private final StreamStateHandle state;
 
-               /** The reference count of the object */
+               /** The current reference count of the state handle */
                private int referenceCount;
 
-               SharedStateEntry(SharedStateHandle value) {
+               SharedStateEntry(StreamStateHandle value) {
                        this.state = value;
                        this.referenceCount = 1;
                }
 
-               SharedStateHandle getState() {
+               StreamStateHandle getState() {
                        return state;
                }
 
@@ -171,14 +225,56 @@ public class SharedStateRegistry {
                }
        }
 
-       public int getReferenceCount(SharedStateHandle state) {
-               if (state == null) {
-                       return 0;
+       /**
+        * The result of an attempt to (un)/reference state
+        */
+       public static class Result {
+
+               /** The (un)registered state handle from the request */
+               private final StreamStateHandle reference;
+
+               /** The reference count to the state handle after the request 
to (un)register */
+               private final int referenceCount;
+
+               private Result(SharedStateEntry sharedStateEntry) {
+                       this.reference = sharedStateEntry.getState();
+                       this.referenceCount = 
sharedStateEntry.getReferenceCount();
                }
 
-               SharedStateRegistry.SharedStateEntry entry =
-                       registeredStates.get(state.getRegistrationKey());
+               public Result(StreamStateHandle reference, int referenceCount) {
+                       Preconditions.checkArgument(referenceCount >= 0);
 
-               return entry == null ? 0 : entry.getReferenceCount();
+                       this.reference = reference;
+                       this.referenceCount = referenceCount;
+               }
+
+               public StreamStateHandle getReference() {
+                       return reference;
+               }
+
+               public int getReferenceCount() {
+                       return referenceCount;
+               }
+       }
+
+       /**
+        * Encapsulates the operation the delete state handles asynchronously.
+        */
+       private static final class AsyncDisposalRunnable implements Runnable {
+
+               private final StateObject toDispose;
+
+               public AsyncDisposalRunnable(StateObject toDispose) {
+                       this.toDispose = Preconditions.checkNotNull(toDispose);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               toDispose.discardState();
+                       } catch (Exception e) {
+                               LOG.warn("A problem occurred during 
asynchronous disposal of a shared state object: {}", toDispose, e);
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
new file mode 100644
index 0000000..9e59359
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a key that uniquely identifies (on a logical level) 
state handles for
+ * registration in the {@link SharedStateRegistry}. Two files which should 
logically
+ * be the same should have the same {@link SharedStateRegistryKey}. The 
meaning of logical
+ * equivalence is up to the application.
+ */
+public class SharedStateRegistryKey implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Uses a String as internal representation */
+       private final String keyString;
+
+       public SharedStateRegistryKey(String keyString) {
+               this.keyString = Preconditions.checkNotNull(keyString);
+       }
+
+       public String getKeyString() {
+               return keyString;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               SharedStateRegistryKey that = (SharedStateRegistryKey) o;
+               return keyString.equals(that.keyString);
+       }
+
+       @Override
+       public int hashCode() {
+               return keyString.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return keyString;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 821bb69..03e2a13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -19,9 +19,14 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class SharedStateRegistryTest {
 
@@ -30,24 +35,50 @@ public class SharedStateRegistryTest {
         */
        @Test
        public void testRegistryNormal() {
+
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
 
                // register one state
                TestSharedState firstState = new TestSharedState("first");
-               assertEquals(1, sharedStateRegistry.register(firstState));
+               SharedStateRegistry.Result result = 
sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), 
firstState);
+               assertEquals(1, result.getReferenceCount());
+               assertTrue(firstState == result.getReference());
+               assertFalse(firstState.isDiscarded());
 
                // register another state
                TestSharedState secondState = new TestSharedState("second");
-               assertEquals(1, sharedStateRegistry.register(secondState));
-
-               // register the first state again
-               assertEquals(2, sharedStateRegistry.register(firstState));
+               result = 
sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), 
secondState);
+               assertEquals(1, result.getReferenceCount());
+               assertTrue(secondState == result.getReference());
+               assertFalse(firstState.isDiscarded());
+               assertFalse(secondState.isDiscarded());
+
+               // attempt to register state under an existing key
+               TestSharedState firstStatePrime = new 
TestSharedState(firstState.getRegistrationKey().getKeyString());
+               result = 
sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), 
firstStatePrime);
+               assertEquals(2, result.getReferenceCount());
+               assertFalse(firstStatePrime == result.getReference());
+               assertTrue(firstState == result.getReference());
+               assertTrue(firstStatePrime.isDiscarded());
+               assertFalse(firstState.isDiscarded());
+
+               // reference the first state again
+               result = 
sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+               assertEquals(3, result.getReferenceCount());
+               assertTrue(firstState == result.getReference());
+               assertFalse(firstState.isDiscarded());
 
                // unregister the second state
-               assertEquals(0, sharedStateRegistry.unregister(secondState));
+               result = 
sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+               assertEquals(0, result.getReferenceCount());
+               assertTrue(result.getReference() == null);
+               assertTrue(secondState.isDiscarded());
 
                // unregister the first state
-               assertEquals(1, sharedStateRegistry.unregister(firstState));
+               result = 
sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+               assertEquals(2, result.getReferenceCount());
+               assertTrue(firstState == result.getReference());
+               assertFalse(firstState.isDiscarded());
        }
 
        /**
@@ -56,51 +87,47 @@ public class SharedStateRegistryTest {
        @Test(expected = IllegalStateException.class)
        public void testUnregisterWithUnexistedKey() {
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-
-               sharedStateRegistry.unregister(new 
TestSharedState("unexisted"));
+               sharedStateRegistry.releaseReference(new 
SharedStateRegistryKey("non-existent"));
        }
 
-       private static class TestSharedState implements SharedStateHandle {
+       private static class TestSharedState implements StreamStateHandle {
                private static final long serialVersionUID = 
4468635881465159780L;
 
-               private String key;
+               private SharedStateRegistryKey key;
+
+               private boolean discarded;
 
                TestSharedState(String key) {
-                       this.key = key;
+                       this.key = new SharedStateRegistryKey(key);
+                       this.discarded = false;
                }
 
-               @Override
-               public String getRegistrationKey() {
+               public SharedStateRegistryKey getRegistrationKey() {
                        return key;
                }
 
                @Override
                public void discardState() throws Exception {
-                       // nothing to do
+                       this.discarded = true;
                }
 
                @Override
                public long getStateSize() {
-                       return key.length();
+                       return key.toString().length();
                }
 
                @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       TestSharedState testState = (TestSharedState) o;
-
-                       return key.equals(testState.key);
+               public int hashCode() {
+                       return key.hashCode();
                }
 
                @Override
-               public int hashCode() {
-                       return key.hashCode();
+               public FSDataInputStream openInputStream() throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               public boolean isDiscarded() {
+                       return discarded;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b30b8eef/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 658ccde..ca66ffb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -482,6 +482,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @SuppressWarnings("unchecked")
        public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                Environment env = new DummyEnvironment("test", 1, 0);
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
@@ -509,6 +510,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
+               snapshot.registerSharedStates(sharedStateRegistry);
                backend.dispose();
 
                // ========== restore snapshot - should use default serializer 
(ONLY SERIALIZATION) ==========
@@ -518,8 +520,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
 
-               snapshot.discardState();
-
                // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
                // initializeSerializerUnlessSet would not pick up our new 
config
                kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -536,6 +536,11 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
+               snapshot2.registerSharedStates(sharedStateRegistry);
+
+               snapshot.unregisterSharedStates(sharedStateRegistry);
+               snapshot.discardState();
+
                backend.dispose();
 
                // ========= restore snapshot - should use default serializer 
(FAIL ON DESERIALIZATION) =========
@@ -570,6 +575,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @Test
        public void 
testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception 
{
                CheckpointStreamFactory streamFactory = createStreamFactory();
+               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                Environment env = new DummyEnvironment("test", 1, 0);
 
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
@@ -597,6 +603,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
+               snapshot.registerSharedStates(sharedStateRegistry);
                backend.dispose();
 
                // ========== restore snapshot - should use specific serializer 
(ONLY SERIALIZATION) ==========
@@ -605,8 +612,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
 
-               snapshot.discardState();
-
                // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
                // initializeSerializerUnlessSet would not pick up our new 
config
                kvId = new ValueStateDescriptor<>("id", pojoType);
@@ -623,6 +628,11 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
+               snapshot2.registerSharedStates(sharedStateRegistry);
+
+               snapshot.unregisterSharedStates(sharedStateRegistry);
+               snapshot.discardState();
+
                backend.dispose();
 
                // ========= restore snapshot - should use specific serializer 
(FAIL ON DESERIALIZATION) =========

Reply via email to