This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 02110ceb58d [FLINK-36001][state] Store operator name and UID in state 
metadata
02110ceb58d is described below

commit 02110ceb58dc6838ac965477d174c6760eff53f2
Author: Gabor Somogyi <[email protected]>
AuthorDate: Fri Sep 13 15:13:41 2024 +0200

    [FLINK-36001][state] Store operator name and UID in state metadata
---
 .../apache/flink/state/api/SavepointWriter.java    |  22 ++--
 .../state/api/StateBootstrapTransformation.java    |  35 +++---
 .../api/output/OperatorSubtaskStateReducer.java    |  17 ++-
 .../StateBootstrapTransformationWithID.java        |  12 +-
 .../api/runtime/metadata/OperatorStateSpecV2.java  |  17 ++-
 .../api/runtime/metadata/SavepointMetadataV2.java  |   2 +-
 .../api/SavepointWriterUidModificationITCase.java  | 126 ++++++++++++++++++---
 .../api/StateBootstrapTransformationTest.java      |  11 +-
 .../api/input/BroadcastStateInputFormatTest.java   |   3 +-
 .../state/api/input/KeyedStateInputFormatTest.java |  14 +--
 .../state/api/input/ListStateInputFormatTest.java  |   3 +-
 .../input/StreamOperatorContextBuilderTest.java    |   2 +-
 .../state/api/input/UnionStateInputFormatTest.java |   3 +-
 .../flink/state/api/input/WindowReaderTest.java    |   2 +-
 .../api/output/SavepointOutputFormatTest.java      |   3 +-
 .../org/apache/flink/runtime/OperatorIDPair.java   |  38 ++++++-
 .../flink/runtime/checkpoint/Checkpoints.java      |   4 +-
 .../runtime/checkpoint/DefaultCheckpointPlan.java  |  12 +-
 .../checkpoint/FullyFinishedOperatorState.java     |  19 +++-
 .../flink/runtime/checkpoint/OperatorState.java    |  48 +++++++-
 .../runtime/checkpoint/PendingCheckpoint.java      |  22 ++--
 .../checkpoint/StateAssignmentOperation.java       |   2 +
 .../checkpoint/metadata/MetadataSerializers.java   |   3 +-
 .../checkpoint/metadata/MetadataV2Serializer.java  |   3 +-
 .../checkpoint/metadata/MetadataV3Serializer.java  |  13 ++-
 .../checkpoint/metadata/MetadataV4Serializer.java  |   9 +-
 .../checkpoint/metadata/MetadataV5Serializer.java  | 121 ++++++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java      |  50 ++++++--
 .../apache/flink/runtime/OperatorIDPairTest.java   |  28 +++++
 .../CheckpointCoordinatorRestoringTest.java        |   8 +-
 .../checkpoint/CheckpointMetadataLoadingTest.java  |   5 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |   4 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |   3 +-
 .../checkpoint/DefaultCheckpointPlanTest.java      |   6 +-
 .../checkpoint/FullyFinishedOperatorStateTest.java |   5 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |   4 +-
 .../checkpoint/StateAssignmentOperationTest.java   |  29 ++---
 .../checkpoint/VertexFinishedStateCheckerTest.java |  12 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   |  14 ++-
 .../metadata/MetadataV5SerializerTest.java         | 101 +++++++++++++++++
 .../apache/flink/runtime/jobmaster/TestUtils.java  |   2 +-
 .../OperatorCoordinatorSchedulerTest.java          |   5 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |   2 +-
 .../runtime/state/SharedStateRegistryTest.java     |   2 +-
 44 files changed, 684 insertions(+), 162 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index 01f51ca997a..f8b8d464f2d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -377,7 +377,7 @@ public class SavepointWriter {
                                 newOperatorState
                                         .getBootstrapTransformation()
                                         .writeOperatorState(
-                                                
newOperatorState.getOperatorID(),
+                                                
newOperatorState.getOperatorIdentifier(),
                                                 stateBackend,
                                                 config,
                                                 metadata.getMaxParallelism(),
@@ -402,16 +402,24 @@ public class SavepointWriter {
                     value.getOperatorStates().stream()
                             .map(
                                     operatorState -> {
-                                        OperatorIdentifier operatorIdentifier =
-                                                OperatorIdentifier.forUidHash(
-                                                        operatorState
-                                                                
.getOperatorID()
-                                                                
.toHexString());
+                                        OperatorIdentifier operatorIdentifier;
+                                        if 
(operatorState.getOperatorUid().isPresent()) {
+                                            operatorIdentifier =
+                                                    OperatorIdentifier.forUid(
+                                                            
operatorState.getOperatorUid().get());
+                                        } else {
+                                            operatorIdentifier =
+                                                    
OperatorIdentifier.forUidHash(
+                                                            operatorState
+                                                                    
.getOperatorID()
+                                                                    
.toHexString());
+                                        }
 
                                         final OperatorIdentifier 
transformedIdentifier =
                                                 
uidTransformationMap.remove(operatorIdentifier);
                                         if (transformedIdentifier != null) {
-                                            return 
operatorState.copyWithNewOperatorID(
+                                            return 
operatorState.copyWithNewIDs(
+                                                    
transformedIdentifier.getUid().orElse(null),
                                                     
transformedIdentifier.getOperatorId());
                                         }
                                         return operatorState;
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
index ddc83a4adf9..6e6f55bcc62 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
@@ -29,7 +29,6 @@ import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.output.BootstrapStreamTaskRunner;
 import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
@@ -115,7 +114,7 @@ public class StateBootstrapTransformation<T> {
     }
 
     /**
-     * @param operatorID The operator id for the stream operator.
+     * @param operatorIdentifier The operator identifier for the stream 
operator.
      * @param stateBackend The state backend for the job.
      * @param config Additional configurations applied to the bootstrap stream 
tasks.
      * @param globalMaxParallelism Global max parallelism set for the 
savepoint.
@@ -123,7 +122,7 @@ public class StateBootstrapTransformation<T> {
      * @return The operator subtask states for this bootstrap transformation.
      */
     DataStream<OperatorState> writeOperatorState(
-            OperatorID operatorID,
+            OperatorIdentifier operatorIdentifier,
             StateBackend stateBackend,
             Configuration config,
             int globalMaxParallelism,
@@ -131,27 +130,36 @@ public class StateBootstrapTransformation<T> {
         int localMaxParallelism = getMaxParallelism(globalMaxParallelism);
 
         return writeOperatorSubtaskStates(
-                        operatorID, stateBackend, config, savepointPath, 
localMaxParallelism)
+                        operatorIdentifier,
+                        stateBackend,
+                        config,
+                        savepointPath,
+                        localMaxParallelism)
                 .transform(
                         "reduce(OperatorSubtaskState)",
                         TypeInformation.of(OperatorState.class),
                         new GroupReduceOperator<>(
-                                new OperatorSubtaskStateReducer(operatorID, 
localMaxParallelism)))
+                                new OperatorSubtaskStateReducer(
+                                        operatorIdentifier, 
localMaxParallelism)))
                 .forceNonParallel();
     }
 
     @VisibleForTesting
     SingleOutputStreamOperator<TaggedOperatorSubtaskState> 
writeOperatorSubtaskStates(
-            OperatorID operatorID,
+            OperatorIdentifier operatorIdentifier,
             StateBackend stateBackend,
             Path savepointPath,
             int localMaxParallelism) {
         return writeOperatorSubtaskStates(
-                operatorID, stateBackend, new Configuration(), savepointPath, 
localMaxParallelism);
+                operatorIdentifier,
+                stateBackend,
+                new Configuration(),
+                savepointPath,
+                localMaxParallelism);
     }
 
     private SingleOutputStreamOperator<TaggedOperatorSubtaskState> 
writeOperatorSubtaskStates(
-            OperatorID operatorID,
+            OperatorIdentifier operatorIdentifier,
             StateBackend stateBackend,
             Configuration additionalConfig,
             Path savepointPath,
@@ -161,7 +169,8 @@ public class StateBootstrapTransformation<T> {
 
         operator = stream.getExecutionEnvironment().clean(operator);
 
-        final StreamConfig config = getConfig(operatorID, stateBackend, 
additionalConfig, operator);
+        final StreamConfig config =
+                getConfig(operatorIdentifier, stateBackend, additionalConfig, 
operator);
 
         BootstrapStreamTaskRunner<T> operatorRunner =
                 new BootstrapStreamTaskRunner<>(config, localMaxParallelism);
@@ -173,7 +182,7 @@ public class StateBootstrapTransformation<T> {
 
         SingleOutputStreamOperator<TaggedOperatorSubtaskState> subtaskStates =
                 input.transform(
-                                operatorID.toHexString(),
+                                
operatorIdentifier.getOperatorId().toHexString(),
                                 
TypeInformation.of(TaggedOperatorSubtaskState.class),
                                 operatorRunner)
                         .setMaxParallelism(localMaxParallelism);
@@ -191,7 +200,7 @@ public class StateBootstrapTransformation<T> {
 
     @VisibleForTesting
     StreamConfig getConfig(
-            OperatorID operatorID,
+            OperatorIdentifier operatorIdentifier,
             StateBackend stateBackend,
             Configuration additionalConfig,
             StreamOperator<TaggedOperatorSubtaskState> operator) {
@@ -217,8 +226,8 @@ public class StateBootstrapTransformation<T> {
         }
 
         config.setStreamOperator(operator);
-        config.setOperatorName(operatorID.toHexString());
-        config.setOperatorID(operatorID);
+        
config.setOperatorName(operatorIdentifier.getOperatorId().toHexString());
+        config.setOperatorID(operatorIdentifier.getOperatorId());
         config.setStateBackend(stateBackend);
         
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
 1.0);
         config.serializeAllConfigs();
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
index 24aecb3acb4..464833cb0be 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -41,15 +41,15 @@ public class OperatorSubtaskStateReducer
 
     private static final long serialVersionUID = 1L;
 
-    private final OperatorID operatorID;
+    private final OperatorIdentifier operatorIdentifier;
 
     private final int maxParallelism;
 
-    public OperatorSubtaskStateReducer(OperatorID operatorID, int 
maxParallelism) {
-        Preconditions.checkNotNull(operatorID, "Operator id must not be 
null.");
+    public OperatorSubtaskStateReducer(OperatorIdentifier operatorIdentifier, 
int maxParallelism) {
+        Preconditions.checkNotNull(operatorIdentifier, "Operator identifier 
must not be null.");
         Preconditions.checkState(maxParallelism > 1);
 
-        this.operatorID = operatorID;
+        this.operatorIdentifier = operatorIdentifier;
         this.maxParallelism = maxParallelism;
     }
 
@@ -64,7 +64,12 @@ public class OperatorSubtaskStateReducer
                 StreamSupport.stream(values.spliterator(), 
false).collect(Collectors.toList());
 
         OperatorState operatorState =
-                new OperatorState(operatorID, subtasks.size(), maxParallelism);
+                new OperatorState(
+                        "ReducedOperatorState",
+                        operatorIdentifier.getUid().orElse(null),
+                        operatorIdentifier.getOperatorId(),
+                        subtasks.size(),
+                        maxParallelism);
 
         for (TaggedOperatorSubtaskState value : subtasks) {
             operatorState.putState(value.index, value.state);
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
index 2dabf1abe45..7a2dcfb54e0 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
@@ -20,6 +20,7 @@ package org.apache.flink.state.api.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
 import org.apache.flink.state.api.StateBootstrapTransformation;
 import org.apache.flink.util.Preconditions;
 
@@ -31,17 +32,18 @@ import org.apache.flink.util.Preconditions;
 @Internal
 public class StateBootstrapTransformationWithID<T> {
 
-    private final OperatorID operatorID;
+    private final OperatorIdentifier operatorIdentifier;
     private final StateBootstrapTransformation<T> bootstrapTransformation;
 
     public StateBootstrapTransformationWithID(
-            OperatorID operatorID, StateBootstrapTransformation<T> 
bootstrapTransformation) {
-        this.operatorID = Preconditions.checkNotNull(operatorID);
+            OperatorIdentifier operatorIdentifier,
+            StateBootstrapTransformation<T> bootstrapTransformation) {
+        this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
         this.bootstrapTransformation = 
Preconditions.checkNotNull(bootstrapTransformation);
     }
 
-    public OperatorID getOperatorID() {
-        return operatorID;
+    public OperatorIdentifier getOperatorIdentifier() {
+        return operatorIdentifier;
     }
 
     public StateBootstrapTransformation<T> getBootstrapTransformation() {
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
index ff6a98cd3af..dd339e3dcdc 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
@@ -20,7 +20,7 @@ package org.apache.flink.state.api.runtime.metadata;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
 import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +34,7 @@ import javax.annotation.Nullable;
 @Internal
 class OperatorStateSpecV2 {
 
-    private final OperatorID id;
+    private final OperatorIdentifier identifier;
 
     @Nullable private final OperatorState existingState;
 
@@ -50,13 +50,18 @@ class OperatorStateSpecV2 {
     }
 
     private OperatorStateSpecV2(OperatorState existingState) {
-        this.id = existingState.getOperatorID();
+        if (existingState.getOperatorUid().isPresent()) {
+            this.identifier = 
OperatorIdentifier.forUid(existingState.getOperatorUid().get());
+        } else {
+            this.identifier =
+                    
OperatorIdentifier.forUidHash(existingState.getOperatorID().toHexString());
+        }
         this.existingState = existingState;
         this.newOperatorStateTransformation = null;
     }
 
     private OperatorStateSpecV2(StateBootstrapTransformationWithID<?> 
transformation) {
-        this.id = transformation.getOperatorID();
+        this.identifier = transformation.getOperatorIdentifier();
         this.newOperatorStateTransformation = transformation;
         this.existingState = null;
     }
@@ -71,7 +76,7 @@ class OperatorStateSpecV2 {
 
     OperatorState asExistingState() {
         Preconditions.checkState(
-                isExistingState(), "OperatorState %s is not an existing 
state.", id);
+                isExistingState(), "OperatorState %s is not an existing 
state.", identifier);
         return existingState;
     }
 
@@ -80,7 +85,7 @@ class OperatorStateSpecV2 {
         Preconditions.checkState(
                 isNewStateTransformation(),
                 "OperatorState %s is not a new state defined with 
BootstrapTransformation",
-                id);
+                identifier);
         return (StateBootstrapTransformationWithID<T>) 
newOperatorStateTransformation;
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
index 72cfa27781f..08c40d54763 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
@@ -119,7 +119,7 @@ public class SavepointMetadataV2 {
         operatorStateIndex.put(
                 id,
                 OperatorStateSpecV2.newWithTransformation(
-                        new StateBootstrapTransformationWithID<>(id, 
transformation)));
+                        new StateBootstrapTransformationWithID<>(identifier, 
transformation)));
     }
 
     /** @return List of {@link OperatorState} that already exists within the 
savepoint. */
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
index 1b6b665d687..dcd7f32a0c2 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
@@ -24,10 +24,13 @@ import 
org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -43,8 +46,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -85,7 +90,7 @@ public class SavepointWriterUidModificationITCase {
                                         OperatorIdentifier.forUidHash(uidHash),
                                         OperatorIdentifier.forUid(uid)));
 
-        runAndValidate(newSavepoint, Tuple2.of(STATE_1, uid));
+        runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, uid, 
null));
     }
 
     @Test
@@ -107,7 +112,29 @@ public class SavepointWriterUidModificationITCase {
                                         OperatorIdentifier.forUid(uid),
                                         OperatorIdentifier.forUid(newUid)));
 
-        runAndValidate(newSavepoint, Tuple2.of(STATE_1, newUid));
+        runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, newUid, 
null));
+    }
+
+    @Test
+    public void testChangeUidHashOnly(@TempDir Path tmp) throws Exception {
+        final String uid = "uid";
+        final String newUidHash = new AbstractID().toHexString();
+        final String originalSavepoint =
+                bootstrapState(
+                        tmp,
+                        (env, writer) ->
+                                writer.withOperator(
+                                        OperatorIdentifier.forUid(uid), 
bootstrap(env, STATE_1)));
+        final String newSavepoint =
+                modifySavepoint(
+                        tmp,
+                        originalSavepoint,
+                        writer ->
+                                writer.changeOperatorIdentifier(
+                                        OperatorIdentifier.forUid(uid),
+                                        
OperatorIdentifier.forUidHash(newUidHash)));
+
+        runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, null, 
newUidHash));
     }
 
     @Test
@@ -136,7 +163,10 @@ public class SavepointWriterUidModificationITCase {
                                                 
OperatorIdentifier.forUid(uid2),
                                                 
OperatorIdentifier.forUid(uid1)));
 
-        runAndValidate(newSavepoint, Tuple2.of(STATE_1, uid2), 
Tuple2.of(STATE_2, uid1));
+        runAndValidate(
+                newSavepoint,
+                ValidationParameters.of(STATE_1, uid2, null),
+                ValidationParameters.of(STATE_2, uid1, null));
     }
 
     private static String bootstrapState(
@@ -184,21 +214,57 @@ public class SavepointWriterUidModificationITCase {
         return newSavepointPath;
     }
 
-    @SafeVarargs
     private static void runAndValidate(
-            String savepointPath, Tuple2<Collection<Integer>, String>... 
assertions)
-            throws Exception {
+            String savepointPath, ValidationParameters... 
validationParameters) throws Exception {
+        // validate metadata
+        CheckpointMetadata metadata = 
SavepointLoader.loadSavepointMetadata(savepointPath);
+        
assertThat(metadata.getOperatorStates().size()).isEqualTo(validationParameters.length);
+        for (ValidationParameters validationParameter : validationParameters) {
+            if (validationParameter.getUid() != null) {
+                Set<OperatorState> operators =
+                        metadata.getOperatorStates().stream()
+                                .filter(
+                                        os ->
+                                                os.getOperatorUid().isPresent()
+                                                        && os.getOperatorUid()
+                                                                .get()
+                                                                .equals(
+                                                                        
validationParameter
+                                                                               
 .getUid()))
+                                .collect(Collectors.toSet());
+                assertThat(operators.size()).isEqualTo(1);
+                assertThat(operators.iterator().next().getOperatorID())
+                        .isEqualTo(
+                                
OperatorIdentifier.forUid(validationParameter.getUid())
+                                        .getOperatorId());
+            } else {
+                Set<OperatorState> operators =
+                        metadata.getOperatorStates().stream()
+                                .filter(
+                                        os ->
+                                                os.getOperatorID()
+                                                        .toHexString()
+                                                        
.equals(validationParameter.getUidHash()))
+                                .collect(Collectors.toSet());
+                assertThat(operators.size()).isEqualTo(1);
+                
assertThat(operators.iterator().next().getOperatorUid()).isEmpty();
+            }
+        }
+
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         // prepare collection of state
         final List<CloseableIterator<Integer>> iterators = new ArrayList<>();
-        for (Tuple2<Collection<Integer>, String> assertion : assertions) {
-            iterators.add(
-                    env.fromData(assertion.f0)
+        for (ValidationParameters validationParameter : validationParameters) {
+            SingleOutputStreamOperator<Integer> stream =
+                    env.fromData(validationParameter.getState())
                             .keyBy(v -> v)
-                            .map(new StateReader())
-                            .uid(assertion.f1)
-                            .collectAsync());
+                            .map(new StateReader());
+            if (validationParameter.getUid() != null) {
+                
iterators.add(stream.uid(validationParameter.getUid()).collectAsync());
+            } else {
+                
iterators.add(stream.setUidHash(validationParameter.getUidHash()).collectAsync());
+            }
         }
 
         // run job
@@ -208,10 +274,10 @@ public class SavepointWriterUidModificationITCase {
         env.executeAsync(streamGraph);
 
         // validate state
-        for (int i = 0; i < assertions.length; i++) {
+        for (int i = 0; i < validationParameters.length; i++) {
             assertThat(iterators.get(i))
                     .toIterable()
-                    .containsExactlyInAnyOrderElementsOf(assertions[i].f0);
+                    
.containsExactlyInAnyOrderElementsOf(validationParameters[i].getState());
         }
 
         for (CloseableIterator<Integer> iterator : iterators) {
@@ -219,6 +285,36 @@ public class SavepointWriterUidModificationITCase {
         }
     }
 
+    private static class ValidationParameters {
+        private final Collection<Integer> state;
+        private final String uid;
+        private final String uidHash;
+
+        public ValidationParameters(
+                final Collection<Integer> state, final String uid, final 
String uidHash) {
+            this.state = state;
+            this.uid = uid;
+            this.uidHash = uidHash;
+        }
+
+        public Collection<Integer> getState() {
+            return state;
+        }
+
+        public String getUid() {
+            return uid;
+        }
+
+        public String getUidHash() {
+            return uidHash;
+        }
+
+        public static ValidationParameters of(
+                final Collection<Integer> state, final String uid, final 
String uidHash) {
+            return new ValidationParameters(state, uid, uidHash);
+        }
+    }
+
     /** A savepoint writer function. */
     public static class StateBootstrapper extends 
KeyedStateBootstrapFunction<Integer, Integer> {
         private transient ValueState<Integer> state;
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
index 776ab093db3..5928c809c6d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
 import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -54,7 +53,7 @@ public class StateBootstrapTransformationTest extends 
AbstractTestBaseJUnit4 {
         int maxParallelism = transformation.getMaxParallelism(4);
         DataStream<TaggedOperatorSubtaskState> result =
                 transformation.writeOperatorSubtaskStates(
-                        OperatorIDGenerator.fromUid("uid"),
+                        OperatorIdentifier.forUid("uid"),
                         new HashMapStateBackend(),
                         new Path(),
                         maxParallelism);
@@ -79,7 +78,7 @@ public class StateBootstrapTransformationTest extends 
AbstractTestBaseJUnit4 {
         int maxParallelism = transformation.getMaxParallelism(10);
         DataStream<TaggedOperatorSubtaskState> result =
                 transformation.writeOperatorSubtaskStates(
-                        OperatorIDGenerator.fromUid("uid"),
+                        OperatorIdentifier.forUid("uid"),
                         new HashMapStateBackend(),
                         new Path(),
                         maxParallelism);
@@ -104,7 +103,7 @@ public class StateBootstrapTransformationTest extends 
AbstractTestBaseJUnit4 {
         int maxParallelism = transformation.getMaxParallelism(4);
         DataStream<TaggedOperatorSubtaskState> result =
                 transformation.writeOperatorSubtaskStates(
-                        OperatorIDGenerator.fromUid("uid"),
+                        OperatorIdentifier.forUid("uid"),
                         new HashMapStateBackend(),
                         new Path(),
                         maxParallelism);
@@ -130,7 +129,7 @@ public class StateBootstrapTransformationTest extends 
AbstractTestBaseJUnit4 {
         int maxParallelism = transformation.getMaxParallelism(4);
         DataStream<TaggedOperatorSubtaskState> result =
                 transformation.writeOperatorSubtaskStates(
-                        OperatorIDGenerator.fromUid("uid"),
+                        OperatorIdentifier.forUid("uid"),
                         new HashMapStateBackend(),
                         new Path(),
                         maxParallelism);
@@ -153,7 +152,7 @@ public class StateBootstrapTransformationTest extends 
AbstractTestBaseJUnit4 {
 
         StreamConfig config =
                 transformation.getConfig(
-                        OperatorIDGenerator.fromUid("uid"),
+                        OperatorIdentifier.forUid("uid"),
                         new HashMapStateBackend(),
                         new Configuration(),
                         null);
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
index 83ec5f4cbaf..62933ef7f7a 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
@@ -57,7 +57,8 @@ public class BroadcastStateInputFormatTest {
             testHarness.processElement2(new StreamRecord<>(3));
 
             OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
-            OperatorState state = new 
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+            OperatorState state =
+                    new OperatorState(null, null, 
OperatorIDGenerator.fromUid("uid"), 1, 4);
             state.putState(0, subtaskState);
 
             OperatorStateInputSplit split =
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
index ee129d00fc0..c287cdb5598 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
@@ -66,7 +66,7 @@ public class KeyedStateInputFormatTest {
 
         OperatorSubtaskState state =
                 createOperatorSubtaskState(new StreamFlatMap<>(new 
StatefulFunction()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -87,7 +87,7 @@ public class KeyedStateInputFormatTest {
 
         OperatorSubtaskState state =
                 createOperatorSubtaskState(new StreamFlatMap<>(new 
StatefulFunction()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -110,7 +110,7 @@ public class KeyedStateInputFormatTest {
 
         OperatorSubtaskState state =
                 createOperatorSubtaskState(new StreamFlatMap<>(new 
StatefulFunction()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -135,7 +135,7 @@ public class KeyedStateInputFormatTest {
 
         OperatorSubtaskState state =
                 createOperatorSubtaskState(new StreamFlatMap<>(new 
StatefulFunction()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -161,7 +161,7 @@ public class KeyedStateInputFormatTest {
 
         OperatorSubtaskState state =
                 createOperatorSubtaskState(new StreamFlatMap<>(new 
StatefulFunction()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -187,7 +187,7 @@ public class KeyedStateInputFormatTest {
         OperatorSubtaskState state =
                 createOperatorSubtaskState(
                         new KeyedProcessOperator<>(new 
StatefulFunctionWithTime()));
-        OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
         operatorState.putState(0, state);
 
         KeyedStateInputFormat<?, ?, ?> format =
@@ -213,7 +213,7 @@ public class KeyedStateInputFormatTest {
             throws IOException {
         KeyedStateInputFormat<Integer, VoidNamespace, Integer> format =
                 new KeyedStateInputFormat<>(
-                        new OperatorState(OperatorIDGenerator.fromUid("uid"), 
1, 4),
+                        new OperatorState(null, null, 
OperatorIDGenerator.fromUid("uid"), 1, 4),
                         new MemoryStateBackend(),
                         new Configuration(),
                         new KeyedStateReaderOperator<>(userFunction, 
Types.INT),
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
index f9b0d273448..6e837d80997 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
@@ -59,7 +59,8 @@ public class ListStateInputFormatTest {
             testHarness.processElement(3, 0);
 
             OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
-            OperatorState state = new 
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+            OperatorState state =
+                    new OperatorState(null, null, 
OperatorIDGenerator.fromUid("uid"), 1, 4);
             state.putState(0, subtaskState);
 
             OperatorStateInputSplit split =
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
index 939d0f5a7df..93e6a05464d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
@@ -50,7 +50,7 @@ public class StreamOperatorContextBuilderTest {
                 new StreamOperatorContextBuilder(
                         context,
                         configuration,
-                        new OperatorState(new OperatorID(), 1, 128),
+                        new OperatorState(null, null, new OperatorID(), 1, 
128),
                         new PrioritizedOperatorSubtaskStateInputSplit() {
                             @Override
                             public PrioritizedOperatorSubtaskState
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
index 828c787deda..215d2c40325 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
@@ -60,7 +60,8 @@ public class UnionStateInputFormatTest {
             testHarness.processElement(3, 0);
 
             OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
-            OperatorState state = new 
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+            OperatorState state =
+                    new OperatorState(null, null, 
OperatorIDGenerator.fromUid("uid"), 1, 4);
             state.putState(0, subtaskState);
 
             OperatorStateInputSplit split =
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
index b465b024880..355f28aa532 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
@@ -248,7 +248,7 @@ public class WindowReaderTest {
         harness.close();
 
         OperatorID operatorID = OperatorIDGenerator.fromUid(UID);
-        OperatorState operatorState = new OperatorState(operatorID, 1, 
MAX_PARALLELISM);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, MAX_PARALLELISM);
         operatorState.putState(0, state);
         return operatorState;
     }
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
index 51346556e68..bac5fddd70c 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -77,7 +77,8 @@ public class SavepointOutputFormatTest {
     }
 
     private CheckpointMetadata createSavepoint() {
-        OperatorState operatorState = new 
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 128);
+        OperatorState operatorState =
+                new OperatorState(null, null, 
OperatorIDGenerator.fromUid("uid"), 1, 128);
 
         operatorState.putState(0, OperatorSubtaskState.builder().build());
         return new CheckpointMetadata(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
index 498d68d8e8e..7f3a042de33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
@@ -31,28 +31,56 @@ public class OperatorIDPair implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final OperatorID generatedOperatorID;
-    private final OperatorID userDefinedOperatorID;
+    @Nullable private final OperatorID userDefinedOperatorID;
+    @Nullable private final String userDefinedOperatorName;
+    @Nullable private final String userDefinedOperatorUid;
 
     private OperatorIDPair(
-            OperatorID generatedOperatorID, @Nullable OperatorID 
userDefinedOperatorID) {
+            OperatorID generatedOperatorID,
+            @Nullable OperatorID userDefinedOperatorID,
+            @Nullable String userDefinedOperatorName,
+            @Nullable String userDefinedOperatorUid) {
         this.generatedOperatorID = generatedOperatorID;
         this.userDefinedOperatorID = userDefinedOperatorID;
+        if (userDefinedOperatorName != null && 
userDefinedOperatorName.isEmpty()) {
+            throw new IllegalArgumentException("Empty string operator name is 
not allowed");
+        }
+        this.userDefinedOperatorName = userDefinedOperatorName;
+        if (userDefinedOperatorUid != null && 
userDefinedOperatorUid.isEmpty()) {
+            throw new IllegalArgumentException("Empty string operator uid is 
not allowed");
+        }
+        this.userDefinedOperatorUid = userDefinedOperatorUid;
     }
 
     public static OperatorIDPair of(
-            OperatorID generatedOperatorID, @Nullable OperatorID 
userDefinedOperatorID) {
-        return new OperatorIDPair(generatedOperatorID, userDefinedOperatorID);
+            OperatorID generatedOperatorID,
+            @Nullable OperatorID userDefinedOperatorID,
+            @Nullable String operatorName,
+            @Nullable String operatorUid) {
+        return new OperatorIDPair(
+                generatedOperatorID, userDefinedOperatorID, operatorName, 
operatorUid);
     }
 
     public static OperatorIDPair generatedIDOnly(OperatorID 
generatedOperatorID) {
-        return new OperatorIDPair(generatedOperatorID, null);
+        return new OperatorIDPair(generatedOperatorID, null, null, null);
     }
 
     public OperatorID getGeneratedOperatorID() {
         return generatedOperatorID;
     }
 
+    @Nullable
     public Optional<OperatorID> getUserDefinedOperatorID() {
         return Optional.ofNullable(userDefinedOperatorID);
     }
+
+    @Nullable
+    public String getUserDefinedOperatorName() {
+        return userDefinedOperatorName;
+    }
+
+    @Nullable
+    public String getUserDefinedOperatorUid() {
+        return userDefinedOperatorUid;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index f445727e6ef..682e9fdcea1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
-import org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer;
+import org.apache.flink.runtime.checkpoint.metadata.MetadataV5Serializer;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -85,7 +85,7 @@ public class Checkpoints {
 
     public static void storeCheckpointMetadata(
             CheckpointMetadata checkpointMetadata, DataOutputStream out) 
throws IOException {
-        storeCheckpointMetadata(checkpointMetadata, out, 
MetadataV4Serializer.INSTANCE);
+        storeCheckpointMetadata(checkpointMetadata, out, 
MetadataV5Serializer.INSTANCE);
     }
 
     public static void storeCheckpointMetadata(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
index 9253799a17a..44e81b3de7a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
@@ -228,19 +228,21 @@ public class DefaultCheckpointPlan implements 
CheckpointPlan {
             Map<OperatorID, OperatorState> operatorStates) {
         // Completes the operator state for the fully finished operators
         for (ExecutionJobVertex jobVertex : 
fullyFinishedOrFinishedOnRestoreVertices.values()) {
-            for (OperatorIDPair operatorID : jobVertex.getOperatorIDs()) {
+            for (OperatorIDPair operatorIDPair : jobVertex.getOperatorIDs()) {
                 OperatorState operatorState =
-                        
operatorStates.get(operatorID.getGeneratedOperatorID());
+                        
operatorStates.get(operatorIDPair.getGeneratedOperatorID());
                 checkState(
                         operatorState == null || 
!operatorState.hasSubtaskStates(),
                         "There should be no states or only coordinator state 
reported for fully finished operators");
 
                 operatorState =
                         new FullyFinishedOperatorState(
-                                operatorID.getGeneratedOperatorID(),
+                                operatorIDPair.getUserDefinedOperatorName(),
+                                operatorIDPair.getUserDefinedOperatorUid(),
+                                operatorIDPair.getGeneratedOperatorID(),
                                 jobVertex.getParallelism(),
                                 jobVertex.getMaxParallelism());
-                operatorStates.put(operatorID.getGeneratedOperatorID(), 
operatorState);
+                operatorStates.put(operatorIDPair.getGeneratedOperatorID(), 
operatorState);
             }
         }
     }
@@ -260,6 +262,8 @@ public class DefaultCheckpointPlan implements 
CheckpointPlan {
                 if (operatorState == null) {
                     operatorState =
                             new OperatorState(
+                                    
operatorIDPair.getUserDefinedOperatorName(),
+                                    operatorIDPair.getUserDefinedOperatorUid(),
                                     operatorIDPair.getGeneratedOperatorID(),
                                     jobVertex.getParallelism(),
                                     jobVertex.getMaxParallelism());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
index 1726ec93c15..cd405a6cd2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
@@ -31,8 +31,13 @@ public class FullyFinishedOperatorState extends 
OperatorState {
 
     private static final long serialVersionUID = 1L;
 
-    public FullyFinishedOperatorState(OperatorID operatorID, int parallelism, 
int maxParallelism) {
-        super(operatorID, parallelism, maxParallelism);
+    public FullyFinishedOperatorState(
+            @Nullable String operatorName,
+            @Nullable String operatorUid,
+            OperatorID operatorID,
+            int parallelism,
+            int maxParallelism) {
+        super(operatorName, operatorUid, operatorID, parallelism, 
maxParallelism);
     }
 
     @Override
@@ -55,7 +60,11 @@ public class FullyFinishedOperatorState extends 
OperatorState {
     @Override
     public OperatorState copyAndDiscardInFlightData() {
         return new FullyFinishedOperatorState(
-                getOperatorID(), getParallelism(), getMaxParallelism());
+                getOperatorName().orElse(null),
+                getOperatorUid().orElse(null),
+                getOperatorID(),
+                getParallelism(),
+                getMaxParallelism());
     }
 
     @Override
@@ -75,6 +84,10 @@ public class FullyFinishedOperatorState extends 
OperatorState {
     @Override
     public String toString() {
         return "FullyFinishedOperatorState("
+                + "name: "
+                + getOperatorName()
+                + "uid: "
+                + getOperatorUid()
                 + "operatorID: "
                 + getOperatorID()
                 + ", parallelism: "
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index d377ca5ec7b..1707ed93800 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -47,6 +48,12 @@ public class OperatorState implements CompositeStateHandle {
 
     private static final long serialVersionUID = -4845578005863201810L;
 
+    /** The name of the operator. */
+    @Nullable private String operatorName;
+
+    /** The Uid of the operator. */
+    @Nullable private String operatorUid;
+
     /** The id of the operator. */
     private final OperatorID operatorID;
 
@@ -65,7 +72,12 @@ public class OperatorState implements CompositeStateHandle {
      */
     private final int maxParallelism;
 
-    public OperatorState(OperatorID operatorID, int parallelism, int 
maxParallelism) {
+    public OperatorState(
+            @Nullable String operatorName,
+            @Nullable String operatorUid,
+            OperatorID operatorID,
+            int parallelism,
+            int maxParallelism) {
         if (parallelism > maxParallelism) {
             throw new IllegalArgumentException(
                     String.format(
@@ -73,6 +85,8 @@ public class OperatorState implements CompositeStateHandle {
                             parallelism, maxParallelism));
         }
 
+        this.operatorName = operatorName;
+        this.operatorUid = operatorUid;
         this.operatorID = operatorID;
 
         this.operatorSubtaskStates = 
CollectionUtil.newHashMapWithExpectedSize(parallelism);
@@ -81,6 +95,22 @@ public class OperatorState implements CompositeStateHandle {
         this.maxParallelism = maxParallelism;
     }
 
+    public Optional<String> getOperatorName() {
+        return Optional.ofNullable(operatorName);
+    }
+
+    public void setOperatorName(String operatorName) {
+        this.operatorName = operatorName;
+    }
+
+    public Optional<String> getOperatorUid() {
+        return Optional.ofNullable(operatorUid);
+    }
+
+    public void setOperatorUid(String operatorUid) {
+        this.operatorUid = operatorUid;
+    }
+
     public OperatorID getOperatorID() {
         return operatorID;
     }
@@ -145,14 +175,18 @@ public class OperatorState implements 
CompositeStateHandle {
         return maxParallelism;
     }
 
-    public OperatorState copyWithNewOperatorID(OperatorID newOperatorId) {
-        OperatorState newState = new OperatorState(newOperatorId, parallelism, 
maxParallelism);
+    public OperatorState copyWithNewIDs(@Nullable String newOperatorUid, 
OperatorID newOperatorId) {
+        OperatorState newState =
+                new OperatorState(
+                        operatorName, newOperatorUid, newOperatorId, 
parallelism, maxParallelism);
         operatorSubtaskStates.forEach(newState::putState);
         return newState;
     }
 
     public OperatorState copyAndDiscardInFlightData() {
-        OperatorState newState = new OperatorState(operatorID, parallelism, 
maxParallelism);
+        OperatorState newState =
+                new OperatorState(
+                        operatorName, operatorUid, operatorID, parallelism, 
maxParallelism);
 
         for (Map.Entry<Integer, OperatorSubtaskState> 
originalSubtaskStateEntry :
                 operatorSubtaskStates.entrySet()) {
@@ -256,7 +290,11 @@ public class OperatorState implements CompositeStateHandle 
{
         // KvStates are always null in 1.1. Don't print this as it might
         // confuse users that don't care about how we store it internally.
         return "OperatorState("
-                + "operatorID: "
+                + "name: "
+                + getOperatorName()
+                + ", uid: "
+                + getOperatorUid()
+                + ", operatorID: "
                 + operatorID
                 + ", parallelism: "
                 + parallelism
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2f2cc6f6f67..7712d905d03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -407,9 +407,8 @@ public class PendingCheckpoint implements Checkpoint {
             if (operatorSubtaskStates != null && 
operatorSubtaskStates.isTaskDeployedAsFinished()) {
                 checkpointPlan.reportTaskFinishedOnRestore(vertex);
             } else {
-                List<OperatorIDPair> operatorIDs = 
vertex.getJobVertex().getOperatorIDs();
-                for (OperatorIDPair operatorID : operatorIDs) {
-                    updateOperatorState(vertex, operatorSubtaskStates, 
operatorID);
+                for (OperatorIDPair operatorIDPair : 
vertex.getJobVertex().getOperatorIDs()) {
+                    updateOperatorState(vertex, operatorSubtaskStates, 
operatorIDPair);
                 }
 
                 if (operatorSubtaskStates != null && 
operatorSubtaskStates.isTaskFinished()) {
@@ -464,22 +463,27 @@ public class PendingCheckpoint implements Checkpoint {
     private void updateOperatorState(
             ExecutionVertex vertex,
             TaskStateSnapshot operatorSubtaskStates,
-            OperatorIDPair operatorID) {
-        OperatorState operatorState = 
operatorStates.get(operatorID.getGeneratedOperatorID());
+            OperatorIDPair operatorIDPair) {
+        OperatorState operatorState = 
operatorStates.get(operatorIDPair.getGeneratedOperatorID());
 
         if (operatorState == null) {
             operatorState =
                     new OperatorState(
-                            operatorID.getGeneratedOperatorID(),
+                            operatorIDPair.getUserDefinedOperatorName(),
+                            operatorIDPair.getUserDefinedOperatorUid(),
+                            operatorIDPair.getGeneratedOperatorID(),
                             vertex.getTotalNumberOfParallelSubtasks(),
                             vertex.getMaxParallelism());
-            operatorStates.put(operatorID.getGeneratedOperatorID(), 
operatorState);
+            operatorStates.put(operatorIDPair.getGeneratedOperatorID(), 
operatorState);
+        } else {
+            
operatorState.setOperatorName(operatorIDPair.getUserDefinedOperatorName());
+            
operatorState.setOperatorUid(operatorIDPair.getUserDefinedOperatorUid());
         }
         OperatorSubtaskState operatorSubtaskState =
                 operatorSubtaskStates == null
                         ? null
                         : operatorSubtaskStates.getSubtaskStateByOperatorID(
-                                operatorID.getGeneratedOperatorID());
+                                operatorIDPair.getGeneratedOperatorID());
 
         if (operatorSubtaskState != null) {
             operatorState.putState(vertex.getParallelSubtaskIndex(), 
operatorSubtaskState);
@@ -507,6 +511,8 @@ public class PendingCheckpoint implements Checkpoint {
             if (operatorState == null) {
                 operatorState =
                         new OperatorState(
+                                null,
+                                null,
                                 operatorId,
                                 coordinatorInfo.currentParallelism(),
                                 coordinatorInfo.maxParallelism());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 07088d901f6..917e81f6d49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -119,6 +119,8 @@ public class StateAssignmentOperation {
                 if (operatorState == null) {
                     operatorState =
                             new OperatorState(
+                                    
operatorIDPair.getUserDefinedOperatorName(),
+                                    operatorIDPair.getUserDefinedOperatorUid(),
                                     operatorID,
                                     executionJobVertex.getParallelism(),
                                     executionJobVertex.getMaxParallelism());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
index ee370b009b9..8646c48e1cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
@@ -30,13 +30,14 @@ import java.util.Map;
 public class MetadataSerializers {
 
     private static final Map<Integer, MetadataSerializer> SERIALIZERS =
-            CollectionUtil.newHashMapWithExpectedSize(4);
+            CollectionUtil.newHashMapWithExpectedSize(5);
 
     static {
         registerSerializer(MetadataV1Serializer.INSTANCE);
         registerSerializer(MetadataV2Serializer.INSTANCE);
         registerSerializer(MetadataV3Serializer.INSTANCE);
         registerSerializer(MetadataV4Serializer.INSTANCE);
+        registerSerializer(MetadataV5Serializer.INSTANCE);
     }
 
     private static void registerSerializer(MetadataSerializer serializer) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
index 49f6f3a3e2e..c993acb1c4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
@@ -118,7 +118,8 @@ public class MetadataV2Serializer extends 
MetadataV2V3SerializerBase implements
         dis.readInt();
 
         // Add task state
-        final OperatorState taskState = new OperatorState(jobVertexId, 
parallelism, maxParallelism);
+        final OperatorState taskState =
+                new OperatorState(null, null, jobVertexId, parallelism, 
maxParallelism);
 
         // Sub task states
         final int numSubTaskStates = dis.readInt();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index 83b5201e5ec..fc6df97ce8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -66,7 +66,7 @@ public class MetadataV3Serializer extends 
MetadataV2V3SerializerBase implements
             new ChannelStateHandleSerializer();
 
     /** Singleton, not meant to be instantiated. */
-    private MetadataV3Serializer() {}
+    protected MetadataV3Serializer() {}
 
     @Override
     public int getVersion() {
@@ -80,7 +80,7 @@ public class MetadataV3Serializer extends 
MetadataV2V3SerializerBase implements
     @Override
     public void serialize(CheckpointMetadata checkpointMetadata, 
DataOutputStream dos)
             throws IOException {
-        INSTANCE.serializeMetadata(checkpointMetadata, dos);
+        serializeMetadata(checkpointMetadata, dos);
     }
 
     @Override
@@ -163,11 +163,12 @@ public class MetadataV3Serializer extends 
MetadataV2V3SerializerBase implements
             checkState(
                     coordinateState == null,
                     "Coordinator State should be null for fully finished 
operator state");
-            return new FullyFinishedOperatorState(jobVertexId, parallelism, 
maxParallelism);
+            return new FullyFinishedOperatorState(
+                    null, null, jobVertexId, parallelism, maxParallelism);
         }
 
         final OperatorState operatorState =
-                new OperatorState(jobVertexId, parallelism, maxParallelism);
+                new OperatorState(null, null, jobVertexId, parallelism, 
maxParallelism);
 
         // Coordinator state
         operatorState.setCoordinatorState(coordinateState);
@@ -189,7 +190,7 @@ public class MetadataV3Serializer extends 
MetadataV2V3SerializerBase implements
         return operatorState;
     }
 
-    private SubtaskAndFinishedState 
deserializeSubtaskIndexAndFinishedState(DataInputStream dis)
+    protected SubtaskAndFinishedState 
deserializeSubtaskIndexAndFinishedState(DataInputStream dis)
             throws IOException {
         int storedSubtaskIndex = dis.readInt();
         if (storedSubtaskIndex < 0) {
@@ -306,7 +307,7 @@ public class MetadataV3Serializer extends 
MetadataV2V3SerializerBase implements
         return INSTANCE.deserializeResultSubpartitionStateHandle(dis, null);
     }
 
-    private static class SubtaskAndFinishedState {
+    static class SubtaskAndFinishedState {
 
         final int subtaskIndex;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
index 758e03a7ecb..88262fa805a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
@@ -31,7 +31,7 @@ import java.io.ObjectOutputStream;
  * serialization.
  */
 @Internal
-public class MetadataV4Serializer implements MetadataSerializer {
+public class MetadataV4Serializer extends MetadataV3Serializer {
 
     public static final MetadataSerializer INSTANCE = new 
MetadataV4Serializer();
     public static final int VERSION = 4;
@@ -45,15 +45,14 @@ public class MetadataV4Serializer implements 
MetadataSerializer {
     public CheckpointMetadata deserialize(
             DataInputStream dis, ClassLoader userCodeClassLoader, String 
externalPointer)
             throws IOException {
-        return MetadataV3Serializer.INSTANCE
-                .deserialize(dis, userCodeClassLoader, externalPointer)
+        return super.deserialize(dis, userCodeClassLoader, externalPointer)
                 .withProperties(deserializeProperties(dis));
     }
 
     @Override
     public void serialize(CheckpointMetadata checkpointMetadata, 
DataOutputStream dos)
             throws IOException {
-        MetadataV3Serializer.INSTANCE.serialize(checkpointMetadata, dos);
+        super.serialize(checkpointMetadata, dos);
         serializeProperties(checkpointMetadata.getCheckpointProperties(), dos);
     }
 
@@ -66,7 +65,7 @@ public class MetadataV4Serializer implements 
MetadataSerializer {
         }
     }
 
-    private static void serializeProperties(CheckpointProperties properties, 
DataOutputStream dos)
+    private void serializeProperties(CheckpointProperties properties, 
DataOutputStream dos)
             throws IOException {
         new ObjectOutputStream(dos).writeObject(properties); // closed outside
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
new file mode 100644
index 00000000000..ca0d568ccf4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.metadata;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** V5 serializer that adds Operator name and uid. */
+@Internal
+public class MetadataV5Serializer extends MetadataV4Serializer {
+
+    public static final MetadataSerializer INSTANCE = new 
MetadataV5Serializer();
+    public static final int VERSION = 5;
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+
+    @Override
+    protected void serializeOperatorState(OperatorState operatorState, 
DataOutputStream dos)
+            throws IOException {
+        if (operatorState.getOperatorName().isPresent()
+                && operatorState.getOperatorName().get().isEmpty()) {
+            throw new IllegalArgumentException("Empty string operator name is 
not allowed");
+        }
+        if (operatorState.getOperatorUid().isPresent()
+                && operatorState.getOperatorUid().get().isEmpty()) {
+            throw new IllegalArgumentException("Empty string operator uid is 
not allowed");
+        }
+        // Name and UID are null in the whole chain when not provided, but 
since we can store
+        // strings in metadata we do conversion here
+        dos.writeUTF(operatorState.getOperatorName().orElse(""));
+        dos.writeUTF(operatorState.getOperatorUid().orElse(""));
+        super.serializeOperatorState(operatorState, dos);
+    }
+
+    @Override
+    protected OperatorState deserializeOperatorState(
+            DataInputStream dis, @Nullable DeserializationContext context) 
throws IOException {
+        final String name = dis.readUTF();
+        final String uid = dis.readUTF();
+
+        final OperatorID jobVertexId = new OperatorID(dis.readLong(), 
dis.readLong());
+        final int parallelism = dis.readInt();
+        final int maxParallelism = dis.readInt();
+
+        ByteStreamStateHandle coordinateState =
+                deserializeAndCheckByteStreamStateHandle(dis, context);
+
+        final int numSubTaskStates = dis.readInt();
+        if (numSubTaskStates < 0) {
+            checkState(
+                    coordinateState == null,
+                    "Coordinator State should be null for fully finished 
operator state");
+            return new FullyFinishedOperatorState(
+                    name.isEmpty() ? null : name,
+                    uid.isEmpty() ? null : uid,
+                    jobVertexId,
+                    parallelism,
+                    maxParallelism);
+        }
+
+        // Name and UID are null in the whole chain when not provided, but 
since we can store
+        // strings in metadata we do conversion here
+        final OperatorState operatorState =
+                new OperatorState(
+                        name.isEmpty() ? null : name,
+                        uid.isEmpty() ? null : uid,
+                        jobVertexId,
+                        parallelism,
+                        maxParallelism);
+
+        // Coordinator state
+        operatorState.setCoordinatorState(coordinateState);
+
+        // Sub task states
+        for (int j = 0; j < numSubTaskStates; j++) {
+            SubtaskAndFinishedState subtaskAndFinishedState =
+                    deserializeSubtaskIndexAndFinishedState(dis);
+            if (subtaskAndFinishedState.isFinished) {
+                operatorState.putState(
+                        subtaskAndFinishedState.subtaskIndex,
+                        FinishedOperatorSubtaskState.INSTANCE);
+            } else {
+                final OperatorSubtaskState subtaskState = 
deserializeSubtaskState(dis, context);
+                operatorState.putState(subtaskAndFinishedState.subtaskIndex, 
subtaskState);
+            }
+        }
+
+        return operatorState;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 358c62d8610..7fd640c49cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -984,16 +984,21 @@ public class StreamingJobGraphGenerator {
 
         JobVertexID jobVertexId = new JobVertexID(hash);
 
-        List<Tuple2<byte[], byte[]>> chainedOperators =
+        List<ChainedOperatorHashInfo> chainedOperators =
                 chainInfo.getChainedOperatorHashes(streamNodeId);
         List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
         if (chainedOperators != null) {
-            for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
+            for (ChainedOperatorHashInfo chainedOperator : chainedOperators) {
                 OperatorID userDefinedOperatorID =
-                        chainedOperator.f1 == null ? null : new 
OperatorID(chainedOperator.f1);
+                        chainedOperator.getUserDefinedOperatorID() == null
+                                ? null
+                                : new 
OperatorID(chainedOperator.getUserDefinedOperatorID());
                 operatorIDPairs.add(
                         OperatorIDPair.of(
-                                new OperatorID(chainedOperator.f0), 
userDefinedOperatorID));
+                                new 
OperatorID(chainedOperator.getGeneratedOperatorID()),
+                                userDefinedOperatorID,
+                                
chainedOperator.getStreamNode().getOperatorName(),
+                                
chainedOperator.getStreamNode().getTransformationUID()));
             }
         }
 
@@ -2109,7 +2114,7 @@ public class StreamingJobGraphGenerator {
         private final Integer startNodeId;
         private final Map<Integer, byte[]> hashes;
         private final List<Map<Integer, byte[]>> legacyHashes;
-        private final Map<Integer, List<Tuple2<byte[], byte[]>>> 
chainedOperatorHashes;
+        private final Map<Integer, List<ChainedOperatorHashInfo>> 
chainedOperatorHashes;
         private final Map<Integer, ChainedSourceInfo> chainedSources;
         private final List<OperatorCoordinator.Provider> coordinatorProviders;
         private final StreamGraph streamGraph;
@@ -2141,7 +2146,7 @@ public class StreamingJobGraphGenerator {
             return startNodeId;
         }
 
-        private List<Tuple2<byte[], byte[]>> getChainedOperatorHashes(int 
startNodeId) {
+        private List<ChainedOperatorHashInfo> getChainedOperatorHashes(int 
startNodeId) {
             return chainedOperatorHashes.get(startNodeId);
         }
 
@@ -2161,13 +2166,15 @@ public class StreamingJobGraphGenerator {
             recordChainedNode(currentNodeId);
             StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
 
-            List<Tuple2<byte[], byte[]>> operatorHashes =
+            List<ChainedOperatorHashInfo> operatorHashes =
                     chainedOperatorHashes.computeIfAbsent(startNodeId, k -> 
new ArrayList<>());
 
             byte[] primaryHashBytes = hashes.get(currentNodeId);
 
             for (Map<Integer, byte[]> legacyHash : legacyHashes) {
-                operatorHashes.add(new Tuple2<>(primaryHashBytes, 
legacyHash.get(currentNodeId)));
+                operatorHashes.add(
+                        new ChainedOperatorHashInfo(
+                                primaryHashBytes, 
legacyHash.get(currentNodeId), streamNode));
             }
 
             streamNode
@@ -2200,6 +2207,33 @@ public class StreamingJobGraphGenerator {
         }
     }
 
+    private static final class ChainedOperatorHashInfo {
+        private final byte[] generatedOperatorID;
+        private final byte[] userDefinedOperatorID;
+        private final StreamNode streamNode;
+
+        ChainedOperatorHashInfo(
+                final byte[] generatedOperatorID,
+                final byte[] userDefinedOperatorID,
+                final StreamNode streamNode) {
+            this.generatedOperatorID = generatedOperatorID;
+            this.userDefinedOperatorID = userDefinedOperatorID;
+            this.streamNode = streamNode;
+        }
+
+        public byte[] getGeneratedOperatorID() {
+            return generatedOperatorID;
+        }
+
+        public byte[] getUserDefinedOperatorID() {
+            return userDefinedOperatorID;
+        }
+
+        public StreamNode getStreamNode() {
+            return streamNode;
+        }
+    }
+
     private static final class ChainedSourceInfo {
         private final StreamConfig operatorConfig;
         private final StreamConfig.SourceInputConfig inputConfig;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java
new file mode 100644
index 00000000000..db35952daa9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java
@@ -0,0 +1,28 @@
+package org.apache.flink.runtime;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
+
+class OperatorIDPairTest {
+    @Test
+    void testEmptyNameShouldThrowException() {
+        final IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> OperatorIDPair.of(new OperatorID(), null, "", 
null));
+        assertThat(exception.getMessage()).contains("Empty string operator 
name is not allowed");
+    }
+
+    @Test
+    void testEmptyUidShouldThrowException() {
+        final IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> OperatorIDPair.of(new OperatorID(), null, null, 
""));
+        assertThat(exception.getMessage()).contains("Empty string operator uid 
is not allowed");
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 1dd9f2f2b69..83627510b1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -661,7 +661,8 @@ class CheckpointCoordinatorRestoringTest {
 
         // prepare vertex1 state
         for (Tuple2<JobVertexID, OperatorID> id : Arrays.asList(id1, id2)) {
-            OperatorState taskState = new OperatorState(id.f1, parallelism1, 
maxParallelism1);
+            OperatorState taskState =
+                    new OperatorState(null, null, id.f1, parallelism1, 
maxParallelism1);
             operatorStates.put(id.f1, taskState);
             for (int index = 0; index < taskState.getParallelism(); index++) {
                 OperatorSubtaskState subtaskState =
@@ -681,7 +682,8 @@ class CheckpointCoordinatorRestoringTest {
                 new ArrayList<>();
         // prepare vertex2 state
         for (Tuple2<JobVertexID, OperatorID> id : Arrays.asList(id3, id4)) {
-            OperatorState operatorState = new OperatorState(id.f1, 
parallelism2, maxParallelism2);
+            OperatorState operatorState =
+                    new OperatorState(null, null, id.f1, parallelism2, 
maxParallelism2);
             operatorStates.put(id.f1, operatorState);
             List<ChainedStateHandle<OperatorStateHandle>> 
expectedManagedOperatorState =
                     new ArrayList<>();
@@ -1085,7 +1087,7 @@ class CheckpointCoordinatorRestoringTest {
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
         operatorStates.put(
                 op1.getGeneratedOperatorID(),
-                new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 
1, 1));
+                new FullyFinishedOperatorState(null, null, 
op1.getGeneratedOperatorID(), 1, 1));
         CompletedCheckpoint completedCheckpoint =
                 new CompletedCheckpoint(
                         graph.getJobID(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 92faa1438a6..212930044fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -168,7 +168,7 @@ class CheckpointMetadataLoadingTest {
         final int maxParallelism = 1234;
 
         final OperatorState state =
-                new OperatorState(operatorID, maxParallelism / 2, 
maxParallelism);
+                new OperatorState(null, null, operatorID, maxParallelism / 2, 
maxParallelism);
         state.setCoordinatorState(new 
ByteStreamStateHandle("coordinatorState", new byte[0]));
 
         final CompletedCheckpointStorageLocation testSavepoint =
@@ -226,7 +226,8 @@ class CheckpointMetadataLoadingTest {
                                 
singleton(createNewResultSubpartitionStateHandle(10, rnd)))
                         .build();
 
-        final OperatorState state = new OperatorState(operatorId, parallelism, 
parallelism);
+        final OperatorState state =
+                new OperatorState(null, null, operatorId, parallelism, 
parallelism);
         state.putState(0, subtaskState);
 
         return createSavepointWithOperatorState(checkpointId, state);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ebcfc0f5afe..4b52d1007f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -232,7 +232,7 @@ class CheckpointStateRestoreTest {
         // --- (2) Checkpoint misses state for a jobVertex (should work) ---
         Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>();
         {
-            OperatorState taskState = new OperatorState(operatorId1, 3, 3);
+            OperatorState taskState = new OperatorState(null, null, 
operatorId1, 3, 3);
             taskState.putState(0, OperatorSubtaskState.builder().build());
             taskState.putState(1, OperatorSubtaskState.builder().build());
             taskState.putState(2, OperatorSubtaskState.builder().build());
@@ -264,7 +264,7 @@ class CheckpointStateRestoreTest {
 
         // There is no task for this
         {
-            OperatorState taskState = new OperatorState(newOperatorID, 1, 1);
+            OperatorState taskState = new OperatorState(null, null, 
newOperatorID, 1, 1);
             taskState.putState(0, OperatorSubtaskState.builder().build());
 
             checkpointTaskStates.put(newOperatorID, taskState);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 376f251a3d7..05a088e551b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -231,7 +231,8 @@ abstract class CompletedCheckpointStoreTest {
         OperatorID operatorID = new OperatorID();
 
         Map<OperatorID, OperatorState> operatorGroupState = new HashMap<>();
-        OperatorState operatorState = new OperatorState(operatorID, 
numberOfStates, numberOfStates);
+        OperatorState operatorState =
+                new OperatorState(null, null, operatorID, numberOfStates, 
numberOfStates);
         operatorGroupState.put(operatorID, operatorState);
 
         for (int i = 0; i < numberOfStates; i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
index 2f08e0c36a6..3065a22b3f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
@@ -75,7 +75,7 @@ class DefaultCheckpointPlanTest {
         CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
 
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
-        OperatorState operatorState = new OperatorState(operatorId, 2, 2);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorId, 2, 2);
         operatorState.putState(
                 0, 
createSubtaskStateWithUnionListState(TempDirUtils.newFile(temporaryFolder)));
         operatorStates.put(operatorId, operatorState);
@@ -109,7 +109,7 @@ class DefaultCheckpointPlanTest {
         CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
 
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
-        OperatorState operatorState = new OperatorState(operatorId, 2, 2);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorId, 2, 2);
         operatorState.putState(
                 0, 
createSubtaskStateWithUnionListState(TempDirUtils.newFile(temporaryFolder)));
 
@@ -210,7 +210,7 @@ class DefaultCheckpointPlanTest {
         CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
 
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
-        OperatorState operatorState = new OperatorState(finishedOperatorID, 1, 
256);
+        OperatorState operatorState = new OperatorState(null, null, 
finishedOperatorID, 1, 256);
         operatorState.setCoordinatorState(new TestingStreamStateHandle());
         operatorStates.put(finishedOperatorID, operatorState);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
index 3dd2ef9e436..5b49be69fe6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
@@ -40,7 +40,8 @@ class FullyFinishedOperatorStateTest {
 
     @Test
     void testFullyFinishedOperatorState() {
-        OperatorState operatorState = new FullyFinishedOperatorState(new 
OperatorID(), 5, 256);
+        OperatorState operatorState =
+                new FullyFinishedOperatorState(null, null, new OperatorID(), 
5, 256);
         assertThat(operatorState.isFullyFinished()).isTrue();
 
         assertThat(operatorState.getSubtaskStates()).isEmpty();
@@ -66,7 +67,7 @@ class FullyFinishedOperatorStateTest {
         Tuple2<List<StateObject>, OperatorSubtaskState> opSubtaskStates2 =
                 generateSampleOperatorSubtaskState();
 
-        OperatorState operatorState = new OperatorState(new OperatorID(), 2, 
256);
+        OperatorState operatorState = new OperatorState(null, null, new 
OperatorID(), 2, 256);
         operatorState.putState(0, opSubtaskStates1.f1);
         operatorState.putState(1, opSubtaskStates2.f1);
         ByteStreamStateHandle coordinatorState =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index e530edef9b1..4dbc22dc34a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -225,7 +225,7 @@ class PendingCheckpointTest {
                         false, CheckpointType.CHECKPOINT, false, false, false, 
false, false, false);
         QueueExecutor executor = new QueueExecutor();
 
-        OperatorState state = new OperatorState(new OperatorID(), 1, 256);
+        OperatorState state = new OperatorState(null, null, new OperatorID(), 
1, 256);
         OperatorSubtaskStateMock subtaskStateMock = new 
OperatorSubtaskStateMock();
         OperatorSubtaskState subtaskState = subtaskStateMock.getSubtaskState();
         state.putState(0, subtaskState);
@@ -287,7 +287,7 @@ class PendingCheckpointTest {
                                 
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
         pending.acknowledgeTask(ATTEMPT_ID, null, 
mock(CheckpointMetrics.class));
         final OperatorState expectedState =
-                new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM);
+                new OperatorState(null, null, OPERATOR_ID, PARALLELISM, 
MAX_PARALLELISM);
         assertThat(Collections.singletonMap(OPERATOR_ID, expectedState))
                 .isEqualTo(pending.getOperatorStates());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index dc05fa91de9..fd08793e402 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -100,7 +100,7 @@ class StateAssignmentOperationTest {
     @Test
     void testRepartitionSplitDistributeStates() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(1);
         metaInfoMap1.put(
@@ -130,7 +130,7 @@ class StateAssignmentOperationTest {
     @Test
     void testRepartitionUnionState() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(2);
         metaInfoMap1.put(
@@ -164,7 +164,7 @@ class StateAssignmentOperationTest {
     @Test
     void testRepartitionBroadcastState() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(2);
         metaInfoMap1.put(
@@ -202,7 +202,7 @@ class StateAssignmentOperationTest {
     @Test
     void testRepartitionBroadcastStateWithNullSubtaskState() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         // Only the subtask 0 reports the states.
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(2);
@@ -226,7 +226,7 @@ class StateAssignmentOperationTest {
     @Test
     void testRepartitionBroadcastStateWithEmptySubtaskState() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         // Only the subtask 0 reports the states.
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(2);
@@ -254,7 +254,7 @@ class StateAssignmentOperationTest {
     @Test
     void testReDistributeCombinedPartitionableStates() {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 2, 4);
 
         Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new 
HashMap<>(6);
         metaInfoMap1.put(
@@ -808,7 +808,7 @@ class StateAssignmentOperationTest {
         List<OperatorID> operatorIds = buildOperatorIds(2);
         Map<OperatorID, OperatorState> states = new HashMap<>();
         Random random = new Random();
-        OperatorState upstreamState = new OperatorState(operatorIds.get(0), 2, 
MAX_P);
+        OperatorState upstreamState = new OperatorState(null, null, 
operatorIds.get(0), 2, MAX_P);
         OperatorSubtaskState state =
                 OperatorSubtaskState.builder()
                         .setResultSubpartitionState(
@@ -894,9 +894,9 @@ class StateAssignmentOperationTest {
         List<OperatorID> operatorIds = buildOperatorIds(2);
         Map<OperatorID, OperatorState> states = new HashMap<>();
         OperatorState upstreamState =
-                new OperatorState(operatorIds.get(0), upstreamParallelism, 
MAX_P);
+                new OperatorState(null, null, operatorIds.get(0), 
upstreamParallelism, MAX_P);
         OperatorState downstreamState =
-                new OperatorState(operatorIds.get(1), downstreamParallelism, 
MAX_P);
+                new OperatorState(null, null, operatorIds.get(1), 
downstreamParallelism, MAX_P);
 
         states.put(operatorIds.get(0), upstreamState);
         states.put(operatorIds.get(1), downstreamState);
@@ -971,7 +971,8 @@ class StateAssignmentOperationTest {
                 
buildOperatorStates(Collections.singletonList(operatorIds.get(1)), 3);
 
         // Create an operator state marked as finished
-        OperatorState operatorState = new 
FullyFinishedOperatorState(operatorIds.get(0), 3, 256);
+        OperatorState operatorState =
+                new FullyFinishedOperatorState(null, null, operatorIds.get(0), 
3, 256);
         states.put(operatorIds.get(0), operatorState);
 
         Map<OperatorID, ExecutionJobVertex> vertices =
@@ -1043,7 +1044,7 @@ class StateAssignmentOperationTest {
 
     @Test
     void assigningStateHandlesCanNotBeNull() {
-        OperatorState state = new OperatorState(new OperatorID(), 1, MAX_P);
+        OperatorState state = new OperatorState(null, null, new OperatorID(), 
1, MAX_P);
 
         List<KeyedStateHandle> managedKeyedStateHandles =
                 StateAssignmentOperation.getManagedKeyedStateHandles(state, 
KeyGroupRange.of(0, 1));
@@ -1071,7 +1072,8 @@ class StateAssignmentOperationTest {
                                 Function.identity(),
                                 operatorID -> {
                                     OperatorState state =
-                                            new OperatorState(operatorID, 
numSubTasks, MAX_P);
+                                            new OperatorState(
+                                                    "", "", operatorID, 
numSubTasks, MAX_P);
                                     for (int i = 0; i < numSubTasks; i++) {
                                         state.putState(
                                                 i,
@@ -1241,7 +1243,8 @@ class StateAssignmentOperationTest {
                 new JobVertex(
                         operatorID.toHexString(),
                         new JobVertexID(),
-                        singletonList(OperatorIDPair.of(operatorID, 
userDefinedOperatorId)));
+                        singletonList(
+                                OperatorIDPair.of(operatorID, 
userDefinedOperatorId, "", "")));
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobVertex.setParallelism(parallelism);
         return jobVertex;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
index 0cf366a47c5..d21ca01f44e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
@@ -69,7 +69,7 @@ class VertexFinishedStateCheckerTest {
         final JobVertexID jobVertexID1 = new JobVertexID();
         final JobVertexID jobVertexID2 = new JobVertexID();
         // The op1 has uidHash set.
-        OperatorIDPair op1 = OperatorIDPair.of(new OperatorID(), new 
OperatorID());
+        OperatorIDPair op1 = OperatorIDPair.of(new OperatorID(), new 
OperatorID(), "", "");
         OperatorIDPair op2 = OperatorIDPair.generatedIDOnly(new OperatorID());
         OperatorIDPair op3 = OperatorIDPair.generatedIDOnly(new OperatorID());
 
@@ -82,10 +82,10 @@ class VertexFinishedStateCheckerTest {
         Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
         operatorStates.put(
                 useUidHash ? op1.getUserDefinedOperatorID().get() : 
op1.getGeneratedOperatorID(),
-                new FullyFinishedOperatorState(op1.getGeneratedOperatorID(), 
1, 1));
+                new FullyFinishedOperatorState(null, null, 
op1.getGeneratedOperatorID(), 1, 1));
         operatorStates.put(
                 op2.getGeneratedOperatorID(),
-                new OperatorState(op2.getGeneratedOperatorID(), 1, 1));
+                new OperatorState(null, null, op2.getGeneratedOperatorID(), 1, 
1));
 
         Set<ExecutionJobVertex> vertices = new HashSet<>();
         vertices.add(graph.getJobVertex(jobVertexID1));
@@ -292,13 +292,13 @@ class VertexFinishedStateCheckerTest {
             OperatorID operatorId, 
VertexFinishedStateChecker.VertexFinishedState finishedState) {
         switch (finishedState) {
             case ALL_RUNNING:
-                return new OperatorState(operatorId, 2, 2);
+                return new OperatorState(null, null, operatorId, 2, 2);
             case PARTIALLY_FINISHED:
-                OperatorState operatorState = new OperatorState(operatorId, 2, 
2);
+                OperatorState operatorState = new OperatorState(null, null, 
operatorId, 2, 2);
                 operatorState.putState(0, 
FinishedOperatorSubtaskState.INSTANCE);
                 return operatorState;
             case FULLY_FINISHED:
-                return new FullyFinishedOperatorState(operatorId, 2, 2);
+                return new FullyFinishedOperatorState(null, null, operatorId, 
2, 2);
             default:
                 throw new UnsupportedOperationException(
                         "Not supported finished state: " + finishedState);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 63836bdc920..6cfb3a605cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -97,7 +97,13 @@ public class CheckpointTestUtils {
                                 + numFullyFinishedTaskStates);
 
         for (int stateIdx = 0; stateIdx < numAllRunningTaskStates; ++stateIdx) 
{
-            OperatorState taskState = new OperatorState(new OperatorID(), 
numSubtasksPerTask, 128);
+            OperatorState taskState =
+                    new OperatorState(
+                            "operatorName-" + stateIdx,
+                            "operatorUid-" + stateIdx,
+                            new OperatorID(),
+                            numSubtasksPerTask,
+                            128);
             randomlySetCoordinatorState(taskState, random);
             randomlySetSubtaskState(
                     taskState, IntStream.range(0, 
numSubtasksPerTask).toArray(), random, basePath);
@@ -105,7 +111,8 @@ public class CheckpointTestUtils {
         }
 
         for (int stateIdx = 0; stateIdx < numPartlyFinishedTaskStates; 
++stateIdx) {
-            OperatorState taskState = new OperatorState(new OperatorID(), 
numSubtasksPerTask, 128);
+            OperatorState taskState =
+                    new OperatorState(null, null, new OperatorID(), 
numSubtasksPerTask, 128);
             randomlySetCoordinatorState(taskState, random);
             randomlySetSubtaskState(
                     taskState,
@@ -122,7 +129,8 @@ public class CheckpointTestUtils {
 
         for (int stateIdx = 0; stateIdx < numFullyFinishedTaskStates; 
++stateIdx) {
             taskStates.add(
-                    new FullyFinishedOperatorState(new OperatorID(), 
numSubtasksPerTask, 128));
+                    new FullyFinishedOperatorState(
+                            null, null, new OperatorID(), numSubtasksPerTask, 
128));
         }
 
         return taskStates;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
new file mode 100644
index 00000000000..f0a82a01124
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
@@ -0,0 +1,101 @@
+package org.apache.flink.runtime.checkpoint.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.runtime.checkpoint.OperatorState;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Random;
+
+import static java.util.Collections.emptyList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
+
+/** {@link MetadataV5Serializer} test. */
+class MetadataV5SerializerTest {
+
+    private static final MetadataSerializer INSTANCE = 
MetadataV5Serializer.INSTANCE;
+
+    private static final Random RND = new Random();
+
+    private Collection<OperatorState> taskStates;
+
+    private CheckpointMetadata metadata;
+
+    @BeforeEach
+    public void beforeEach(@TempDir Path tempDir) throws IOException {
+        taskStates =
+                CheckpointTestUtils.createOperatorStates(
+                        RND, tempDir.toUri().toString(), 1, 0, 0, 0);
+        metadata = new CheckpointMetadata(1L, taskStates, emptyList(), null);
+    }
+
+    @Test
+    void testSerializeOperatorUidAndName() throws IOException {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(out)) {
+            INSTANCE.serialize(metadata, dos);
+
+            try (DataInputStream dis =
+                    new DataInputStream(new 
ByteArrayInputStream(out.toByteArray()))) {
+                CheckpointMetadata deserializedMetadata =
+                        INSTANCE.deserialize(dis, 
metadata.getClass().getClassLoader(), "");
+                Collection<OperatorState> operatorStates = 
deserializedMetadata.getOperatorStates();
+                assertThat(operatorStates).hasSize(1);
+                OperatorState operatorState = operatorStates.iterator().next();
+                assertThat(operatorState.getOperatorName()).isPresent();
+                
assertThat(operatorState.getOperatorName().get()).isEqualTo("operatorName-0");
+                assertThat(operatorState.getOperatorUid()).isPresent();
+                
assertThat(operatorState.getOperatorUid().get()).isEqualTo("operatorUid-0");
+            }
+        }
+    }
+
+    @Test
+    void testSerializeOperatorNameWithEmptyValue() throws IOException {
+        taskStates.iterator().next().setOperatorName("");
+        testSerializeOperatorWithEmptyValue("Empty string operator name is not 
allowed");
+    }
+
+    @Test
+    void testSerializeOperatorUidWithEmptyValue() throws IOException {
+        taskStates.iterator().next().setOperatorUid("");
+        testSerializeOperatorWithEmptyValue("Empty string operator uid is not 
allowed");
+    }
+
+    void testSerializeOperatorWithEmptyValue(String exceptionMessage) throws 
IOException {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(out)) {
+            final IllegalArgumentException exception =
+                    assertThrows(
+                            IllegalArgumentException.class,
+                            () -> INSTANCE.serialize(metadata, dos));
+            assertThat(exception.getMessage()).contains(exceptionMessage);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
index b5054b0aeb7..ef486068c93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
@@ -69,7 +69,7 @@ public class TestUtils {
         Collection<OperatorState> operatorStates = new 
ArrayList<>(operatorIds.length);
 
         for (OperatorID operatorId : operatorIds) {
-            final OperatorState operatorState = new OperatorState(operatorId, 
1, 42);
+            final OperatorState operatorState = new OperatorState(null, null, 
operatorId, 1, 42);
             final OperatorSubtaskState subtaskState =
                     OperatorSubtaskState.builder()
                             .setManagedOperatorState(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 5882da9656f..a1c5c3cabc5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -667,7 +667,8 @@ class OperatorCoordinatorSchedulerTest {
             boolean restartAllOnFailover)
             throws Exception {
 
-        final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), 
provider.getOperatorId());
+        final OperatorIDPair opIds =
+                OperatorIDPair.of(new OperatorID(), provider.getOperatorId(), 
"", "");
         final JobVertex vertex =
                 new JobVertex(
                         "Vertex with OperatorCoordinator",
@@ -948,7 +949,7 @@ class OperatorCoordinatorSchedulerTest {
     }
 
     private static OperatorState createOperatorState(OperatorID id, byte[] 
coordinatorState) {
-        final OperatorState state = new OperatorState(id, 10, 16384);
+        final OperatorState state = new OperatorState(null, null, id, 10, 
16384);
         state.setCoordinatorState(new ByteStreamStateHandle("name", 
coordinatorState));
         return state;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 79113f7fb39..cffc163ed0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -153,7 +153,7 @@ class SchedulerUtilsTest {
 
     private CompletedCheckpoint buildCheckpoint(KeyedStateHandle incremental) {
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 1, 1);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 1);
         operatorState.putState(
                 0, 
OperatorSubtaskState.builder().setManagedKeyedState(incremental).build());
         return new CompletedCheckpoint(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 59f2f6ebefe..f414ed03374 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -362,7 +362,7 @@ class SharedStateRegistryTest {
                         new StateHandleID(stateId));
 
         OperatorID operatorID = new OperatorID();
-        OperatorState operatorState = new OperatorState(operatorID, 1, 1);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 1);
         operatorState.putState(
                 0, 
OperatorSubtaskState.builder().setManagedKeyedState(initialHandle).build());
 

Reply via email to