This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bc04d255bdf850de73c0ff67e79fbb73fb299ce
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Feb 26 16:08:48 2019 +0100

    [hotfix] Introduce common interface to all IncrementalKeyedStateHandles
---
 .../savepoint/SavepointV2Serializer.java           |  10 +-
 .../runtime/state/IncrementalKeyedStateHandle.java | 307 +--------------------
 .../state/IncrementalLocalKeyedStateHandle.java    |   7 +-
 ...java => IncrementalRemoteKeyedStateHandle.java} |  25 +-
 .../state/PlaceholderStreamStateHandle.java        |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |  10 +-
 .../checkpoint/savepoint/CheckpointTestUtils.java  |   6 +-
 ... => IncrementalRemoteKeyedStateHandleTest.java} |  22 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |   5 +-
 .../streaming/state/RocksDBStateDownloader.java    |   4 +-
 .../RocksDBIncrementalRestoreOperation.java        |  48 ++--
 .../snapshot/RocksIncrementalSnapshotStrategy.java |   6 +-
 .../streaming/state/RocksDBStateBackendTest.java   |  10 +-
 .../state/RocksDBStateDownloaderTest.java          |  10 +-
 14 files changed, 97 insertions(+), 375 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index fa84077..fb942f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -340,9 +340,9 @@ public class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
                        }
                        
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
-               } else if (stateHandle instanceof IncrementalKeyedStateHandle) {
-                       IncrementalKeyedStateHandle incrementalKeyedStateHandle 
=
-                               (IncrementalKeyedStateHandle) stateHandle;
+               } else if (stateHandle instanceof 
IncrementalRemoteKeyedStateHandle) {
+                       IncrementalRemoteKeyedStateHandle 
incrementalKeyedStateHandle =
+                               (IncrementalRemoteKeyedStateHandle) stateHandle;
 
                        dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
 
@@ -427,7 +427,7 @@ public class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                uuid = 
UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8));
                        }
 
-                       return new IncrementalKeyedStateHandle(
+                       return new IncrementalRemoteKeyedStateHandle(
                                uuid,
                                keyGroupRange,
                                checkpointId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 01d4ac0..d3ab3b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,305 +18,24 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
- * The handle to states of an incremental snapshot.
- * <p>
- * The states contained in an incremental snapshot include:
- * <ul>
- * <li> Created shared state which includes shared files produced since the 
last
- * completed checkpoint. These files can be referenced by succeeding 
checkpoints if the
- * checkpoint succeeds to complete. </li>
- * <li> Referenced shared state which includes the shared files materialized 
in previous
- * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, 
all referenced
- * shared state handles are only placeholders, so that we do not send state 
handles twice
- * from which we know that they already exist on the checkpoint 
coordinator.</li>
- * <li> Private state which includes all other files, typically mutable, that 
cannot be shared by
- * other checkpoints. </li>
- * <li> Backend meta state which includes the information of existing states. 
</li>
- * </ul>
- *
- * When this should become a completed checkpoint on the checkpoint 
coordinator, it must first be
- * registered with a {@link SharedStateRegistry}, so that all placeholder 
state handles to
- * previously existing state are replaced with the originals.
- *
- * IMPORTANT: This class currently overrides equals and hash code only for 
testing purposes. They
- * should not be called from production code. This means this class is also 
not suited to serve as
- * a key, e.g. in hash maps.
+ * Common interface to all incremental {@link KeyedStateHandle}.
  */
-public class IncrementalKeyedStateHandle implements KeyedStateHandle {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalKeyedStateHandle.class);
-
-       private static final long serialVersionUID = -8328808513197388231L;
-
-       /**
-        * UUID to identify the backend which created this state handle. This 
is in creating the key for the
-        * {@link SharedStateRegistry}.
-        */
-       private final UUID backendIdentifier;
-
-       /**
-        * The key-group range covered by this state handle.
-        */
-       private final KeyGroupRange keyGroupRange;
-
-       /**
-        * The checkpoint Id.
-        */
-       private final long checkpointId;
-
-       /**
-        * Shared state in the incremental checkpoint.
-        */
-       private final Map<StateHandleID, StreamStateHandle> sharedState;
-
-       /**
-        * Private state in the incremental checkpoint.
-        */
-       private final Map<StateHandleID, StreamStateHandle> privateState;
-
-       /**
-        * Primary meta data state of the incremental checkpoint.
-        */
-       private final StreamStateHandle metaStateHandle;
-
-       /**
-        * Once the shared states are registered, it is the {@link 
SharedStateRegistry}'s
-        * responsibility to cleanup those shared states.
-        * But in the cases where the state handle is discarded before 
performing the registration,
-        * the handle should delete all the shared states created by it.
-        *
-        * This variable is not null iff the handles was registered.
-        */
-       private transient SharedStateRegistry sharedStateRegistry;
-
-       public IncrementalKeyedStateHandle(
-               UUID backendIdentifier,
-               KeyGroupRange keyGroupRange,
-               long checkpointId,
-               Map<StateHandleID, StreamStateHandle> sharedState,
-               Map<StateHandleID, StreamStateHandle> privateState,
-               StreamStateHandle metaStateHandle) {
-
-               this.backendIdentifier = 
Preconditions.checkNotNull(backendIdentifier);
-               this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-               this.checkpointId = checkpointId;
-               this.sharedState = Preconditions.checkNotNull(sharedState);
-               this.privateState = Preconditions.checkNotNull(privateState);
-               this.metaStateHandle = 
Preconditions.checkNotNull(metaStateHandle);
-               this.sharedStateRegistry = null;
-       }
-
-       @Override
-       public KeyGroupRange getKeyGroupRange() {
-               return keyGroupRange;
-       }
-
-       public long getCheckpointId() {
-               return checkpointId;
-       }
-
-       public Map<StateHandleID, StreamStateHandle> getSharedState() {
-               return sharedState;
-       }
-
-       public Map<StateHandleID, StreamStateHandle> getPrivateState() {
-               return privateState;
-       }
-
-       public StreamStateHandle getMetaStateHandle() {
-               return metaStateHandle;
-       }
-
-       public UUID getBackendIdentifier() {
-               return backendIdentifier;
-       }
-
-       public SharedStateRegistry getSharedStateRegistry() {
-               return sharedStateRegistry;
-       }
-
-       @Override
-       public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-               return 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange))
 ?
-                       null : this;
-       }
+public interface IncrementalKeyedStateHandle extends KeyedStateHandle {
 
-       @Override
-       public void discardState() throws Exception {
+       /** Returns the ID of the checkpoint for which the handle was created. 
*/
+       long getCheckpointId();
 
-               SharedStateRegistry registry = this.sharedStateRegistry;
-               final boolean isRegistered = (registry != null);
+       /** Returns the identifier of the state backend from which this handle 
was created.*/
+       @Nonnull
+       UUID getBackendIdentifier();
 
-               LOG.trace("Discarding IncrementalKeyedStateHandle (registered = 
{}) for checkpoint {} from backend with id {}.",
-                       isRegistered,
-                       checkpointId,
-                       backendIdentifier);
-
-               try {
-                       metaStateHandle.discardState();
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard meta data.", e);
-               }
-
-               try {
-                       
StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
-               } catch (Exception e) {
-                       LOG.warn("Could not properly discard misc file 
states.", e);
-               }
-
-               // If this was not registered, we can delete the shared state. 
We can simply apply this
-               // to all handles, because all handles that have not been 
created for the first time for this
-               // are only placeholders at this point (disposing them is a 
NOP).
-               if (isRegistered) {
-                       // If this was registered, we only unregister all our 
referenced shared states
-                       // from the registry.
-                       for (StateHandleID stateHandleID : 
sharedState.keySet()) {
-                               registry.unregisterReference(
-                                       
createSharedStateRegistryKeyFromFileName(stateHandleID));
-                       }
-               } else {
-                       // Otherwise, we assume to own those handles and 
dispose them directly.
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-                       } catch (Exception e) {
-                               LOG.warn("Could not properly discard new sst 
file states.", e);
-                       }
-               }
-       }
-
-       @Override
-       public long getStateSize() {
-               long size = StateUtil.getStateSize(metaStateHandle);
-
-               for (StreamStateHandle sharedStateHandle : 
sharedState.values()) {
-                       size += sharedStateHandle.getStateSize();
-               }
-
-               for (StreamStateHandle privateStateHandle : 
privateState.values()) {
-                       size += privateStateHandle.getStateSize();
-               }
-
-               return size;
-       }
-
-       @Override
-       public void registerSharedStates(SharedStateRegistry stateRegistry) {
-
-               // This is a quick check to avoid that we register twice with 
the same registry. However, the code allows to
-               // register again with a different registry. The implication is 
that ownership is transferred to this new
-               // registry. This should only happen in case of a restart, when 
the CheckpointCoordinator creates a new
-               // SharedStateRegistry for the current attempt and the old 
registry becomes meaningless. We also assume that
-               // an old registry object from a previous run is due to be GCed 
and will never be used for registration again.
-               Preconditions.checkState(
-                       sharedStateRegistry != stateRegistry,
-                       "The state handle has already registered its shared 
states to the given registry.");
-
-               sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
-
-               LOG.trace("Registering IncrementalKeyedStateHandle for 
checkpoint {} from backend with id {}.",
-                       checkpointId,
-                       backendIdentifier);
-
-               for (Map.Entry<StateHandleID, StreamStateHandle> 
sharedStateHandle : sharedState.entrySet()) {
-                       SharedStateRegistryKey registryKey =
-                               
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
-
-                       SharedStateRegistry.Result result =
-                               stateRegistry.registerReference(registryKey, 
sharedStateHandle.getValue());
-
-                       // This step consolidates our shared handles with the 
registry, which does two things:
-                       //
-                       // 1) Replace placeholder state handle with already 
registered, actual state handles.
-                       //
-                       // 2) Deduplicate re-uploads of incremental state due 
to missing confirmations about
-                       // completed checkpoints.
-                       //
-                       // This prevents the following problem:
-                       // A previous checkpoint n has already registered the 
state. This can happen if a
-                       // following checkpoint (n + x) wants to reference the 
same state before the backend got
-                       // notified that checkpoint n completed. In this case, 
the shared registry did
-                       // deduplication and returns the previous reference.
-                       sharedStateHandle.setValue(result.getReference());
-               }
-       }
-
-       /**
-        * Create a unique key to register one of our shared state handles.
-        */
-       @VisibleForTesting
-       public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
-               return new 
SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, 
shId);
-       }
-
-       /**
-        * This method is should only be called in tests! This should never 
serve as key in a hash map.
-        */
-       @VisibleForTesting
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               IncrementalKeyedStateHandle that = 
(IncrementalKeyedStateHandle) o;
-
-               if (getCheckpointId() != that.getCheckpointId()) {
-                       return false;
-               }
-               if 
(!getBackendIdentifier().equals(that.getBackendIdentifier())) {
-                       return false;
-               }
-               if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
-                       return false;
-               }
-               if (!getSharedState().equals(that.getSharedState())) {
-                       return false;
-               }
-               if (!getPrivateState().equals(that.getPrivateState())) {
-                       return false;
-               }
-               return getMetaStateHandle().equals(that.getMetaStateHandle());
-       }
-
-       /**
-        * This method should only be called in tests! This should never serve 
as key in a hash map.
-        */
-       @VisibleForTesting
-       @Override
-       public int hashCode() {
-               int result = getBackendIdentifier().hashCode();
-               result = 31 * result + getKeyGroupRange().hashCode();
-               result = 31 * result + (int) (getCheckpointId() ^ 
(getCheckpointId() >>> 32));
-               result = 31 * result + getSharedState().hashCode();
-               result = 31 * result + getPrivateState().hashCode();
-               result = 31 * result + getMetaStateHandle().hashCode();
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "IncrementalKeyedStateHandle{" +
-                       "backendIdentifier=" + backendIdentifier +
-                       ", keyGroupRange=" + keyGroupRange +
-                       ", checkpointId=" + checkpointId +
-                       ", sharedState=" + sharedState +
-                       ", privateState=" + privateState +
-                       ", metaStateHandle=" + metaStateHandle +
-                       ", registered=" + (sharedStateRegistry != null) +
-                       '}';
-       }
+       /** Returns a set of ids of all registered shared states in the backend 
at the time this was created. */
+       @Nonnull
+       Set<StateHandleID> getSharedStateHandleIDs();
 }
-
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index f80a8ce..16ee2bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -27,11 +27,11 @@ import java.util.Set;
 import java.util.UUID;
 
 /**
- * State handle for local copies of {@link IncrementalKeyedStateHandle}. 
Consists of a {@link DirectoryStateHandle} that
+ * State handle for local copies of {@link IncrementalRemoteKeyedStateHandle}. 
Consists of a {@link DirectoryStateHandle} that
  * represents the directory of the native RocksDB snapshot, the key groups, 
and a stream state handle for Flink's state
  * meta data file.
  */
-public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle {
+public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle implements IncrementalKeyedStateHandle {
 
        private static final long serialVersionUID = 1L;
 
@@ -71,15 +71,18 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
                return metaDataState;
        }
 
+       @Override
        public long getCheckpointId() {
                return checkpointId;
        }
 
+       @Override
        @Nonnull
        public UUID getBackendIdentifier() {
                return backendIdentifier;
        }
 
+       @Override
        @Nonnull
        public Set<StateHandleID> getSharedStateHandleIDs() {
                return sharedStateHandleIDs;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
similarity index 93%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
index 01d4ac0..77fd491 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -24,7 +24,10 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -52,9 +55,9 @@ import java.util.UUID;
  * should not be called from production code. This means this class is also 
not suited to serve as
  * a key, e.g. in hash maps.
  */
-public class IncrementalKeyedStateHandle implements KeyedStateHandle {
+public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateHandle {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalKeyedStateHandle.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalRemoteKeyedStateHandle.class);
 
        private static final long serialVersionUID = -8328808513197388231L;
 
@@ -99,7 +102,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
         */
        private transient SharedStateRegistry sharedStateRegistry;
 
-       public IncrementalKeyedStateHandle(
+       public IncrementalRemoteKeyedStateHandle(
                UUID backendIdentifier,
                KeyGroupRange keyGroupRange,
                long checkpointId,
@@ -121,6 +124,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                return keyGroupRange;
        }
 
+       @Override
        public long getCheckpointId() {
                return checkpointId;
        }
@@ -137,10 +141,17 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                return metaStateHandle;
        }
 
+       @Nonnull
        public UUID getBackendIdentifier() {
                return backendIdentifier;
        }
 
+       @Nonnull
+       @Override
+       public Set<StateHandleID> getSharedStateHandleIDs() {
+               return getSharedState().keySet();
+       }
+
        public SharedStateRegistry getSharedStateRegistry() {
                return sharedStateRegistry;
        }
@@ -157,7 +168,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                SharedStateRegistry registry = this.sharedStateRegistry;
                final boolean isRegistered = (registry != null);
 
-               LOG.trace("Discarding IncrementalKeyedStateHandle (registered = 
{}) for checkpoint {} from backend with id {}.",
+               LOG.trace("Discarding IncrementalRemoteKeyedStateHandle 
(registered = {}) for checkpoint {} from backend with id {}.",
                        isRegistered,
                        checkpointId,
                        backendIdentifier);
@@ -223,7 +234,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
 
                sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
-               LOG.trace("Registering IncrementalKeyedStateHandle for 
checkpoint {} from backend with id {}.",
+               LOG.trace("Registering IncrementalRemoteKeyedStateHandle for 
checkpoint {} from backend with id {}.",
                        checkpointId,
                        backendIdentifier);
 
@@ -271,7 +282,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                        return false;
                }
 
-               IncrementalKeyedStateHandle that = 
(IncrementalKeyedStateHandle) o;
+               IncrementalRemoteKeyedStateHandle that = 
(IncrementalRemoteKeyedStateHandle) o;
 
                if (getCheckpointId() != that.getCheckpointId()) {
                        return false;
@@ -308,7 +319,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
 
        @Override
        public String toString() {
-               return "IncrementalKeyedStateHandle{" +
+               return "IncrementalRemoteKeyedStateHandle{" +
                        "backendIdentifier=" + backendIdentifier +
                        ", keyGroupRange=" + keyGroupRange +
                        ", checkpointId=" + checkpointId +
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 7c948a1..17f1d00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
  * A placeholder state handle for shared state that will replaced by an 
original that was
  * created in a previous checkpoint. So we don't have to send a state handle 
twice, e.g. in
  * case of {@link ByteStreamStateHandle}. This class is used in the referenced 
states of
- * {@link IncrementalKeyedStateHandle}.
+ * {@link IncrementalRemoteKeyedStateHandle}.
  */
 public class PlaceholderStreamStateHandle implements StreamStateHandle {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 02c34d1..1b52218 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -67,8 +67,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -3488,7 +3486,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                        for (KeyedStateHandle keyedStateHandle 
: subtaskState.getManagedKeyedState()) {
                                                // test we are once registered 
with the current registry
                                                verify(keyedStateHandle, 
times(1)).registerSharedStates(createdSharedStateRegistries.get(0));
-                                               IncrementalKeyedStateHandle 
incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle;
+                                               
IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = 
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
 
                                                
sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState());
 
@@ -3618,8 +3616,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StateHandleID("shared-" + cpSequenceNumber),
                                spy(new ByteStreamStateHandle("shared-" + 
cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'})));
 
-                       IncrementalKeyedStateHandle managedState =
-                               spy(new IncrementalKeyedStateHandle(
+                       IncrementalRemoteKeyedStateHandle managedState =
+                               spy(new IncrementalRemoteKeyedStateHandle(
                                        new UUID(42L, 42L),
                                        keyGroupRange,
                                        checkpointId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index 1963766..55a9772 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -255,8 +255,8 @@ public class CheckpointTestUtils {
        private CheckpointTestUtils() {}
 
 
-       public static IncrementalKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
-               return new IncrementalKeyedStateHandle(
+       public static IncrementalRemoteKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
+               return new IncrementalRemoteKeyedStateHandle(
                        createRandomUUID(rnd),
                        new KeyGroupRange(1, 1),
                        42L,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
similarity index 92%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index 9f6f88e..dd1039a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -32,15 +32,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
-public class IncrementalKeyedStateHandleTest {
+public class IncrementalRemoteKeyedStateHandleTest {
 
        /**
-        * This test checks, that for an unregistered {@link 
IncrementalKeyedStateHandle} all state
+        * This test checks, that for an unregistered {@link 
IncrementalRemoteKeyedStateHandle} all state
         * (including shared) is discarded.
         */
        @Test
        public void testUnregisteredDiscarding() throws Exception {
-               IncrementalKeyedStateHandle stateHandle = create(new 
Random(42));
+               IncrementalRemoteKeyedStateHandle stateHandle = create(new 
Random(42));
 
                stateHandle.discardState();
 
@@ -56,7 +56,7 @@ public class IncrementalKeyedStateHandleTest {
        }
 
        /**
-        * This test checks, that for a registered {@link 
IncrementalKeyedStateHandle} discards respect
+        * This test checks, that for a registered {@link 
IncrementalRemoteKeyedStateHandle} discards respect
         * all shared state and only discard it one all references are released.
         */
        @Test
@@ -65,8 +65,8 @@ public class IncrementalKeyedStateHandleTest {
                SharedStateRegistry registry = spy(new SharedStateRegistry());
 
                // Create two state handles with overlapping shared state
-               IncrementalKeyedStateHandle stateHandle1 = create(new 
Random(42));
-               IncrementalKeyedStateHandle stateHandle2 = create(new 
Random(42));
+               IncrementalRemoteKeyedStateHandle stateHandle1 = create(new 
Random(42));
+               IncrementalRemoteKeyedStateHandle stateHandle2 = create(new 
Random(42));
 
                // Both handles should not be registered and not discarded by 
now.
                for (Map.Entry<StateHandleID, StreamStateHandle> entry :
@@ -197,9 +197,9 @@ public class IncrementalKeyedStateHandleTest {
 
                SharedStateRegistry stateRegistryA = spy(new 
SharedStateRegistry());
 
-               IncrementalKeyedStateHandle stateHandleX = create(new 
Random(1));
-               IncrementalKeyedStateHandle stateHandleY = create(new 
Random(2));
-               IncrementalKeyedStateHandle stateHandleZ = create(new 
Random(3));
+               IncrementalRemoteKeyedStateHandle stateHandleX = create(new 
Random(1));
+               IncrementalRemoteKeyedStateHandle stateHandleY = create(new 
Random(2));
+               IncrementalRemoteKeyedStateHandle stateHandleZ = create(new 
Random(3));
 
                // Now we register first time ...
                stateHandleX.registerSharedStates(stateRegistryA);
@@ -257,8 +257,8 @@ public class IncrementalKeyedStateHandleTest {
                sharedStateRegistryB.close();
        }
 
-       private static IncrementalKeyedStateHandle create(Random rnd) {
-               return new IncrementalKeyedStateHandle(
+       private static IncrementalRemoteKeyedStateHandle create(Random rnd) {
+               return new IncrementalRemoteKeyedStateHandle(
                        UUID.nameUUIDFromBytes("test".getBytes()),
                        KeyGroupRange.of(0, 0),
                        1L,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index c5cf501..eb5bccc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -376,9 +375,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                ttlTimeProvider);
                }
                KeyedStateHandle firstStateHandle = 
restoreStateHandles.iterator().next();
-               boolean isIncrementalStateHandle = (firstStateHandle instanceof 
IncrementalKeyedStateHandle)
-                       || (firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle);
-               if (isIncrementalStateHandle) {
+               if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
                        return new RocksDBIncrementalRestoreOperation<>(
                                operatorIdentifier,
                                keyGroupRange,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index 501f333..aa3201c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
@@ -54,7 +54,7 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
         * @throws Exception Thrown if can not transfer all the state data.
         */
        public void transferAllStateDataToDirectory(
-               IncrementalKeyedStateHandle restoreStateHandle,
+               IncrementalRemoteKeyedStateHandle restoreStateHandle,
                Path dest,
                CloseableRegistry closeableRegistry) throws Exception {
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index b085d51..3371f4f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -156,24 +157,25 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
        /**
         * Recovery from a single remote incremental state without rescaling.
         */
-       private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) 
throws Exception {
-               if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
-                       IncrementalKeyedStateHandle incrementalKeyedStateHandle 
= (IncrementalKeyedStateHandle) rawStateHandle;
-                       
restorePreviousIncrementalFilesStatus(incrementalKeyedStateHandle);
-                       restoreFromRemoteState(incrementalKeyedStateHandle);
-               } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+       private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) 
throws Exception {
+               if (keyedStateHandle instanceof 
IncrementalRemoteKeyedStateHandle) {
+                       IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle =
+                               (IncrementalRemoteKeyedStateHandle) 
keyedStateHandle;
+                       
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
+                       
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
+               } else if (keyedStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
                        IncrementalLocalKeyedStateHandle 
incrementalLocalKeyedStateHandle =
-                               (IncrementalLocalKeyedStateHandle) 
rawStateHandle;
+                               (IncrementalLocalKeyedStateHandle) 
keyedStateHandle;
                        
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
                        restoreFromLocalState(incrementalLocalKeyedStateHandle);
                } else {
                        throw new BackendBuildingException("Unexpected state 
handle type, " +
-                               "expected " + IncrementalKeyedStateHandle.class 
+ " or " + IncrementalLocalKeyedStateHandle.class +
-                               ", but found " + rawStateHandle.getClass());
+                               "expected " + 
IncrementalRemoteKeyedStateHandle.class + " or " + 
IncrementalLocalKeyedStateHandle.class +
+                               ", but found " + keyedStateHandle.getClass());
                }
        }
 
-       private void 
restorePreviousIncrementalFilesStatus(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) {
+       private void 
restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle 
localKeyedStateHandle) {
                backendUID = localKeyedStateHandle.getBackendIdentifier();
                restoredSstFiles.put(
                        localKeyedStateHandle.getCheckpointId(),
@@ -181,18 +183,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                lastCompletedCheckpointId = 
localKeyedStateHandle.getCheckpointId();
        }
 
-       private void 
restorePreviousIncrementalFilesStatus(IncrementalKeyedStateHandle 
remoteKeyedStateHandle) {
-               backendUID = remoteKeyedStateHandle.getBackendIdentifier();
-               restoredSstFiles.put(
-                       remoteKeyedStateHandle.getCheckpointId(),
-                       remoteKeyedStateHandle.getSharedState().keySet());
-               lastCompletedCheckpointId = 
remoteKeyedStateHandle.getCheckpointId();
-       }
-
-       private void restoreFromRemoteState(IncrementalKeyedStateHandle 
stateHandle) throws Exception {
+       private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle 
stateHandle) throws Exception {
                final Path tmpRestoreInstancePath = new Path(
                        instanceBasePath.getAbsolutePath(),
-                       UUID.randomUUID().toString()); // used as restore 
source for IncrementalKeyedStateHandle
+                       UUID.randomUUID().toString()); // used as restore 
source for IncrementalRemoteKeyedStateHandle
                try {
                        restoreFromLocalState(
                                
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
@@ -227,7 +221,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
 
        private IncrementalLocalKeyedStateHandle 
transferRemoteStateToLocalDirectory(
                Path temporaryRestoreInstancePath,
-               IncrementalKeyedStateHandle restoreStateHandle) throws 
Exception {
+               IncrementalRemoteKeyedStateHandle restoreStateHandle) throws 
Exception {
 
                try (RocksDBStateDownloader rocksDBStateDownloader = new 
RocksDBStateDownloader(numberOfTransferringThreads)) {
                        rocksDBStateDownloader.transferAllStateDataToDirectory(
@@ -302,15 +296,15 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
 
                for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
 
-                       if (!(rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
+                       if (!(rawStateHandle instanceof 
IncrementalRemoteKeyedStateHandle)) {
                                throw new IllegalStateException("Unexpected 
state handle type, " +
-                                       "expected " + 
IncrementalKeyedStateHandle.class +
+                                       "expected " + 
IncrementalRemoteKeyedStateHandle.class +
                                        ", but found " + 
rawStateHandle.getClass());
                        }
 
                        Path temporaryRestoreInstancePath = new 
Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
                        try (RestoredDBInstance tmpRestoreDBInfo = 
restoreDBInstanceFromStateHandle(
-                               (IncrementalKeyedStateHandle) rawStateHandle,
+                               (IncrementalRemoteKeyedStateHandle) 
rawStateHandle,
                                temporaryRestoreInstancePath);
                                RocksDBWriteBatchWrapper writeBatchWrapper = 
new RocksDBWriteBatchWrapper(this.db)) {
 
@@ -351,10 +345,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
 
        private void initDBWithRescaling(KeyedStateHandle initialHandle) throws 
Exception {
 
-               assert (initialHandle instanceof IncrementalKeyedStateHandle);
+               assert (initialHandle instanceof 
IncrementalRemoteKeyedStateHandle);
 
                // 1. Restore base DB from selected initial handle
-               restoreFromRemoteState((IncrementalKeyedStateHandle) 
initialHandle);
+               restoreFromRemoteState((IncrementalRemoteKeyedStateHandle) 
initialHandle);
 
                // 2. Clip the base DB instance
                try {
@@ -416,7 +410,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
        }
 
        private RestoredDBInstance restoreDBInstanceFromStateHandle(
-               IncrementalKeyedStateHandle restoreStateHandle,
+               IncrementalRemoteKeyedStateHandle restoreStateHandle,
                Path temporaryRestoreInstancePath) throws Exception {
 
                try (RocksDBStateDownloader rocksDBStateDownloader =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 3ede377..ed87024 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -32,8 +32,8 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -318,8 +318,8 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
                                        materializedSstFiles.put(checkpointId, 
sstFiles.keySet());
                                }
 
-                               final IncrementalKeyedStateHandle 
jmIncrementalKeyedStateHandle =
-                                       new IncrementalKeyedStateHandle(
+                               final IncrementalRemoteKeyedStateHandle 
jmIncrementalKeyedStateHandle =
+                                       new IncrementalRemoteKeyedStateHandle(
                                                backendUID,
                                                keyGroupRange,
                                                checkpointId,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index b681a87..9bb3dc0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
@@ -495,7 +495,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                ValueState<String> state =
                                        
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-                               Queue<IncrementalKeyedStateHandle> 
previousStateHandles = new LinkedList<>();
+                               Queue<IncrementalRemoteKeyedStateHandle> 
previousStateHandles = new LinkedList<>();
                                SharedStateRegistry sharedStateRegistry = 
spy(new SharedStateRegistry());
                                for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
 
@@ -514,8 +514,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
 
                                        SnapshotResult<KeyedStateHandle> 
snapshotResult = snapshot.get();
 
-                                       IncrementalKeyedStateHandle stateHandle 
=
-                                               (IncrementalKeyedStateHandle) 
snapshotResult.getJobManagerOwnedSnapshot();
+                                       IncrementalRemoteKeyedStateHandle 
stateHandle =
+                                               
(IncrementalRemoteKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot();
 
                                        Map<StateHandleID, StreamStateHandle> 
sharedState =
                                                new 
HashMap<>(stateHandle.getSharedState());
@@ -551,7 +551,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                }
        }
 
-       private void checkRemove(IncrementalKeyedStateHandle remove, 
SharedStateRegistry registry) throws Exception {
+       private void checkRemove(IncrementalRemoteKeyedStateHandle remove, 
SharedStateRegistry registry) throws Exception {
                for (StateHandleID id : remove.getSharedState().keySet()) {
                        verify(registry, times(0)).unregisterReference(
                                
remove.createSharedStateRegistryKeyFromFileName(id));
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index f99beca..76372b7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -79,8 +79,8 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                Map<StateHandleID, StreamStateHandle> stateHandles = new 
HashMap<>(1);
                stateHandles.put(new StateHandleID("state1"), stateHandle);
 
-               IncrementalKeyedStateHandle incrementalKeyedStateHandle =
-                       new IncrementalKeyedStateHandle(
+               IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
+                       new IncrementalRemoteKeyedStateHandle(
                                UUID.randomUUID(),
                                KeyGroupRange.EMPTY_KEY_GROUP_RANGE,
                                1,
@@ -124,8 +124,8 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                        privateStates.put(new 
StateHandleID(String.format("privateState%d", i)), handles.get(i));
                }
 
-               IncrementalKeyedStateHandle incrementalKeyedStateHandle =
-                       new IncrementalKeyedStateHandle(
+               IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
+                       new IncrementalRemoteKeyedStateHandle(
                                UUID.randomUUID(),
                                KeyGroupRange.of(0, 1),
                                1,

Reply via email to