This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 02110ceb58d [FLINK-36001][state] Store operator name and UID in state
metadata
02110ceb58d is described below
commit 02110ceb58dc6838ac965477d174c6760eff53f2
Author: Gabor Somogyi <[email protected]>
AuthorDate: Fri Sep 13 15:13:41 2024 +0200
[FLINK-36001][state] Store operator name and UID in state metadata
---
.../apache/flink/state/api/SavepointWriter.java | 22 ++--
.../state/api/StateBootstrapTransformation.java | 35 +++---
.../api/output/OperatorSubtaskStateReducer.java | 17 ++-
.../StateBootstrapTransformationWithID.java | 12 +-
.../api/runtime/metadata/OperatorStateSpecV2.java | 17 ++-
.../api/runtime/metadata/SavepointMetadataV2.java | 2 +-
.../api/SavepointWriterUidModificationITCase.java | 126 ++++++++++++++++++---
.../api/StateBootstrapTransformationTest.java | 11 +-
.../api/input/BroadcastStateInputFormatTest.java | 3 +-
.../state/api/input/KeyedStateInputFormatTest.java | 14 +--
.../state/api/input/ListStateInputFormatTest.java | 3 +-
.../input/StreamOperatorContextBuilderTest.java | 2 +-
.../state/api/input/UnionStateInputFormatTest.java | 3 +-
.../flink/state/api/input/WindowReaderTest.java | 2 +-
.../api/output/SavepointOutputFormatTest.java | 3 +-
.../org/apache/flink/runtime/OperatorIDPair.java | 38 ++++++-
.../flink/runtime/checkpoint/Checkpoints.java | 4 +-
.../runtime/checkpoint/DefaultCheckpointPlan.java | 12 +-
.../checkpoint/FullyFinishedOperatorState.java | 19 +++-
.../flink/runtime/checkpoint/OperatorState.java | 48 +++++++-
.../runtime/checkpoint/PendingCheckpoint.java | 22 ++--
.../checkpoint/StateAssignmentOperation.java | 2 +
.../checkpoint/metadata/MetadataSerializers.java | 3 +-
.../checkpoint/metadata/MetadataV2Serializer.java | 3 +-
.../checkpoint/metadata/MetadataV3Serializer.java | 13 ++-
.../checkpoint/metadata/MetadataV4Serializer.java | 9 +-
.../checkpoint/metadata/MetadataV5Serializer.java | 121 ++++++++++++++++++++
.../api/graph/StreamingJobGraphGenerator.java | 50 ++++++--
.../apache/flink/runtime/OperatorIDPairTest.java | 28 +++++
.../CheckpointCoordinatorRestoringTest.java | 8 +-
.../checkpoint/CheckpointMetadataLoadingTest.java | 5 +-
.../checkpoint/CheckpointStateRestoreTest.java | 4 +-
.../checkpoint/CompletedCheckpointStoreTest.java | 3 +-
.../checkpoint/DefaultCheckpointPlanTest.java | 6 +-
.../checkpoint/FullyFinishedOperatorStateTest.java | 5 +-
.../runtime/checkpoint/PendingCheckpointTest.java | 4 +-
.../checkpoint/StateAssignmentOperationTest.java | 29 ++---
.../checkpoint/VertexFinishedStateCheckerTest.java | 12 +-
.../checkpoint/metadata/CheckpointTestUtils.java | 14 ++-
.../metadata/MetadataV5SerializerTest.java | 101 +++++++++++++++++
.../apache/flink/runtime/jobmaster/TestUtils.java | 2 +-
.../OperatorCoordinatorSchedulerTest.java | 5 +-
.../runtime/scheduler/SchedulerUtilsTest.java | 2 +-
.../runtime/state/SharedStateRegistryTest.java | 2 +-
44 files changed, 684 insertions(+), 162 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index 01f51ca997a..f8b8d464f2d 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -377,7 +377,7 @@ public class SavepointWriter {
newOperatorState
.getBootstrapTransformation()
.writeOperatorState(
-
newOperatorState.getOperatorID(),
+
newOperatorState.getOperatorIdentifier(),
stateBackend,
config,
metadata.getMaxParallelism(),
@@ -402,16 +402,24 @@ public class SavepointWriter {
value.getOperatorStates().stream()
.map(
operatorState -> {
- OperatorIdentifier operatorIdentifier =
- OperatorIdentifier.forUidHash(
- operatorState
-
.getOperatorID()
-
.toHexString());
+ OperatorIdentifier operatorIdentifier;
+ if
(operatorState.getOperatorUid().isPresent()) {
+ operatorIdentifier =
+ OperatorIdentifier.forUid(
+
operatorState.getOperatorUid().get());
+ } else {
+ operatorIdentifier =
+
OperatorIdentifier.forUidHash(
+ operatorState
+
.getOperatorID()
+
.toHexString());
+ }
final OperatorIdentifier
transformedIdentifier =
uidTransformationMap.remove(operatorIdentifier);
if (transformedIdentifier != null) {
- return
operatorState.copyWithNewOperatorID(
+ return
operatorState.copyWithNewIDs(
+
transformedIdentifier.getUid().orElse(null),
transformedIdentifier.getOperatorId());
}
return operatorState;
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
index ddc83a4adf9..6e6f55bcc62 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
@@ -29,7 +29,6 @@ import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.BootstrapStreamTaskRunner;
import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
@@ -115,7 +114,7 @@ public class StateBootstrapTransformation<T> {
}
/**
- * @param operatorID The operator id for the stream operator.
+ * @param operatorIdentifier The operator identifier for the stream
operator.
* @param stateBackend The state backend for the job.
* @param config Additional configurations applied to the bootstrap stream
tasks.
* @param globalMaxParallelism Global max parallelism set for the
savepoint.
@@ -123,7 +122,7 @@ public class StateBootstrapTransformation<T> {
* @return The operator subtask states for this bootstrap transformation.
*/
DataStream<OperatorState> writeOperatorState(
- OperatorID operatorID,
+ OperatorIdentifier operatorIdentifier,
StateBackend stateBackend,
Configuration config,
int globalMaxParallelism,
@@ -131,27 +130,36 @@ public class StateBootstrapTransformation<T> {
int localMaxParallelism = getMaxParallelism(globalMaxParallelism);
return writeOperatorSubtaskStates(
- operatorID, stateBackend, config, savepointPath,
localMaxParallelism)
+ operatorIdentifier,
+ stateBackend,
+ config,
+ savepointPath,
+ localMaxParallelism)
.transform(
"reduce(OperatorSubtaskState)",
TypeInformation.of(OperatorState.class),
new GroupReduceOperator<>(
- new OperatorSubtaskStateReducer(operatorID,
localMaxParallelism)))
+ new OperatorSubtaskStateReducer(
+ operatorIdentifier,
localMaxParallelism)))
.forceNonParallel();
}
@VisibleForTesting
SingleOutputStreamOperator<TaggedOperatorSubtaskState>
writeOperatorSubtaskStates(
- OperatorID operatorID,
+ OperatorIdentifier operatorIdentifier,
StateBackend stateBackend,
Path savepointPath,
int localMaxParallelism) {
return writeOperatorSubtaskStates(
- operatorID, stateBackend, new Configuration(), savepointPath,
localMaxParallelism);
+ operatorIdentifier,
+ stateBackend,
+ new Configuration(),
+ savepointPath,
+ localMaxParallelism);
}
private SingleOutputStreamOperator<TaggedOperatorSubtaskState>
writeOperatorSubtaskStates(
- OperatorID operatorID,
+ OperatorIdentifier operatorIdentifier,
StateBackend stateBackend,
Configuration additionalConfig,
Path savepointPath,
@@ -161,7 +169,8 @@ public class StateBootstrapTransformation<T> {
operator = stream.getExecutionEnvironment().clean(operator);
- final StreamConfig config = getConfig(operatorID, stateBackend,
additionalConfig, operator);
+ final StreamConfig config =
+ getConfig(operatorIdentifier, stateBackend, additionalConfig,
operator);
BootstrapStreamTaskRunner<T> operatorRunner =
new BootstrapStreamTaskRunner<>(config, localMaxParallelism);
@@ -173,7 +182,7 @@ public class StateBootstrapTransformation<T> {
SingleOutputStreamOperator<TaggedOperatorSubtaskState> subtaskStates =
input.transform(
- operatorID.toHexString(),
+
operatorIdentifier.getOperatorId().toHexString(),
TypeInformation.of(TaggedOperatorSubtaskState.class),
operatorRunner)
.setMaxParallelism(localMaxParallelism);
@@ -191,7 +200,7 @@ public class StateBootstrapTransformation<T> {
@VisibleForTesting
StreamConfig getConfig(
- OperatorID operatorID,
+ OperatorIdentifier operatorIdentifier,
StateBackend stateBackend,
Configuration additionalConfig,
StreamOperator<TaggedOperatorSubtaskState> operator) {
@@ -217,8 +226,8 @@ public class StateBootstrapTransformation<T> {
}
config.setStreamOperator(operator);
- config.setOperatorName(operatorID.toHexString());
- config.setOperatorID(operatorID);
+
config.setOperatorName(operatorIdentifier.getOperatorId().toHexString());
+ config.setOperatorID(operatorIdentifier.getOperatorId());
config.setStateBackend(stateBackend);
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
1.0);
config.serializeAllConfigs();
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
index 24aecb3acb4..464833cb0be 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/OperatorSubtaskStateReducer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -41,15 +41,15 @@ public class OperatorSubtaskStateReducer
private static final long serialVersionUID = 1L;
- private final OperatorID operatorID;
+ private final OperatorIdentifier operatorIdentifier;
private final int maxParallelism;
- public OperatorSubtaskStateReducer(OperatorID operatorID, int
maxParallelism) {
- Preconditions.checkNotNull(operatorID, "Operator id must not be
null.");
+ public OperatorSubtaskStateReducer(OperatorIdentifier operatorIdentifier,
int maxParallelism) {
+ Preconditions.checkNotNull(operatorIdentifier, "Operator identifier
must not be null.");
Preconditions.checkState(maxParallelism > 1);
- this.operatorID = operatorID;
+ this.operatorIdentifier = operatorIdentifier;
this.maxParallelism = maxParallelism;
}
@@ -64,7 +64,12 @@ public class OperatorSubtaskStateReducer
StreamSupport.stream(values.spliterator(),
false).collect(Collectors.toList());
OperatorState operatorState =
- new OperatorState(operatorID, subtasks.size(), maxParallelism);
+ new OperatorState(
+ "ReducedOperatorState",
+ operatorIdentifier.getUid().orElse(null),
+ operatorIdentifier.getOperatorId(),
+ subtasks.size(),
+ maxParallelism);
for (TaggedOperatorSubtaskState value : subtasks) {
operatorState.putState(value.index, value.state);
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
index 2dabf1abe45..7a2dcfb54e0 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/StateBootstrapTransformationWithID.java
@@ -20,6 +20,7 @@ package org.apache.flink.state.api.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.util.Preconditions;
@@ -31,17 +32,18 @@ import org.apache.flink.util.Preconditions;
@Internal
public class StateBootstrapTransformationWithID<T> {
- private final OperatorID operatorID;
+ private final OperatorIdentifier operatorIdentifier;
private final StateBootstrapTransformation<T> bootstrapTransformation;
public StateBootstrapTransformationWithID(
- OperatorID operatorID, StateBootstrapTransformation<T>
bootstrapTransformation) {
- this.operatorID = Preconditions.checkNotNull(operatorID);
+ OperatorIdentifier operatorIdentifier,
+ StateBootstrapTransformation<T> bootstrapTransformation) {
+ this.operatorIdentifier =
Preconditions.checkNotNull(operatorIdentifier);
this.bootstrapTransformation =
Preconditions.checkNotNull(bootstrapTransformation);
}
- public OperatorID getOperatorID() {
- return operatorID;
+ public OperatorIdentifier getOperatorIdentifier() {
+ return operatorIdentifier;
}
public StateBootstrapTransformation<T> getBootstrapTransformation() {
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
index ff6a98cd3af..dd339e3dcdc 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/OperatorStateSpecV2.java
@@ -20,7 +20,7 @@ package org.apache.flink.state.api.runtime.metadata;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.runtime.StateBootstrapTransformationWithID;
import org.apache.flink.util.Preconditions;
@@ -34,7 +34,7 @@ import javax.annotation.Nullable;
@Internal
class OperatorStateSpecV2 {
- private final OperatorID id;
+ private final OperatorIdentifier identifier;
@Nullable private final OperatorState existingState;
@@ -50,13 +50,18 @@ class OperatorStateSpecV2 {
}
private OperatorStateSpecV2(OperatorState existingState) {
- this.id = existingState.getOperatorID();
+ if (existingState.getOperatorUid().isPresent()) {
+ this.identifier =
OperatorIdentifier.forUid(existingState.getOperatorUid().get());
+ } else {
+ this.identifier =
+
OperatorIdentifier.forUidHash(existingState.getOperatorID().toHexString());
+ }
this.existingState = existingState;
this.newOperatorStateTransformation = null;
}
private OperatorStateSpecV2(StateBootstrapTransformationWithID<?>
transformation) {
- this.id = transformation.getOperatorID();
+ this.identifier = transformation.getOperatorIdentifier();
this.newOperatorStateTransformation = transformation;
this.existingState = null;
}
@@ -71,7 +76,7 @@ class OperatorStateSpecV2 {
OperatorState asExistingState() {
Preconditions.checkState(
- isExistingState(), "OperatorState %s is not an existing
state.", id);
+ isExistingState(), "OperatorState %s is not an existing
state.", identifier);
return existingState;
}
@@ -80,7 +85,7 @@ class OperatorStateSpecV2 {
Preconditions.checkState(
isNewStateTransformation(),
"OperatorState %s is not a new state defined with
BootstrapTransformation",
- id);
+ identifier);
return (StateBootstrapTransformationWithID<T>)
newOperatorStateTransformation;
}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
index 72cfa27781f..08c40d54763 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
@@ -119,7 +119,7 @@ public class SavepointMetadataV2 {
operatorStateIndex.put(
id,
OperatorStateSpecV2.newWithTransformation(
- new StateBootstrapTransformationWithID<>(id,
transformation)));
+ new StateBootstrapTransformationWithID<>(identifier,
transformation)));
}
/** @return List of {@link OperatorState} that already exists within the
savepoint. */
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
index 1b6b665d687..dcd7f32a0c2 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterUidModificationITCase.java
@@ -24,10 +24,13 @@ import
org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -43,8 +46,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -85,7 +90,7 @@ public class SavepointWriterUidModificationITCase {
OperatorIdentifier.forUidHash(uidHash),
OperatorIdentifier.forUid(uid)));
- runAndValidate(newSavepoint, Tuple2.of(STATE_1, uid));
+ runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, uid,
null));
}
@Test
@@ -107,7 +112,29 @@ public class SavepointWriterUidModificationITCase {
OperatorIdentifier.forUid(uid),
OperatorIdentifier.forUid(newUid)));
- runAndValidate(newSavepoint, Tuple2.of(STATE_1, newUid));
+ runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, newUid,
null));
+ }
+
+ @Test
+ public void testChangeUidHashOnly(@TempDir Path tmp) throws Exception {
+ final String uid = "uid";
+ final String newUidHash = new AbstractID().toHexString();
+ final String originalSavepoint =
+ bootstrapState(
+ tmp,
+ (env, writer) ->
+ writer.withOperator(
+ OperatorIdentifier.forUid(uid),
bootstrap(env, STATE_1)));
+ final String newSavepoint =
+ modifySavepoint(
+ tmp,
+ originalSavepoint,
+ writer ->
+ writer.changeOperatorIdentifier(
+ OperatorIdentifier.forUid(uid),
+
OperatorIdentifier.forUidHash(newUidHash)));
+
+ runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, null,
newUidHash));
}
@Test
@@ -136,7 +163,10 @@ public class SavepointWriterUidModificationITCase {
OperatorIdentifier.forUid(uid2),
OperatorIdentifier.forUid(uid1)));
- runAndValidate(newSavepoint, Tuple2.of(STATE_1, uid2),
Tuple2.of(STATE_2, uid1));
+ runAndValidate(
+ newSavepoint,
+ ValidationParameters.of(STATE_1, uid2, null),
+ ValidationParameters.of(STATE_2, uid1, null));
}
private static String bootstrapState(
@@ -184,21 +214,57 @@ public class SavepointWriterUidModificationITCase {
return newSavepointPath;
}
- @SafeVarargs
private static void runAndValidate(
- String savepointPath, Tuple2<Collection<Integer>, String>...
assertions)
- throws Exception {
+ String savepointPath, ValidationParameters...
validationParameters) throws Exception {
+ // validate metadata
+ CheckpointMetadata metadata =
SavepointLoader.loadSavepointMetadata(savepointPath);
+
assertThat(metadata.getOperatorStates().size()).isEqualTo(validationParameters.length);
+ for (ValidationParameters validationParameter : validationParameters) {
+ if (validationParameter.getUid() != null) {
+ Set<OperatorState> operators =
+ metadata.getOperatorStates().stream()
+ .filter(
+ os ->
+ os.getOperatorUid().isPresent()
+ && os.getOperatorUid()
+ .get()
+ .equals(
+
validationParameter
+
.getUid()))
+ .collect(Collectors.toSet());
+ assertThat(operators.size()).isEqualTo(1);
+ assertThat(operators.iterator().next().getOperatorID())
+ .isEqualTo(
+
OperatorIdentifier.forUid(validationParameter.getUid())
+ .getOperatorId());
+ } else {
+ Set<OperatorState> operators =
+ metadata.getOperatorStates().stream()
+ .filter(
+ os ->
+ os.getOperatorID()
+ .toHexString()
+
.equals(validationParameter.getUidHash()))
+ .collect(Collectors.toSet());
+ assertThat(operators.size()).isEqualTo(1);
+
assertThat(operators.iterator().next().getOperatorUid()).isEmpty();
+ }
+ }
+
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// prepare collection of state
final List<CloseableIterator<Integer>> iterators = new ArrayList<>();
- for (Tuple2<Collection<Integer>, String> assertion : assertions) {
- iterators.add(
- env.fromData(assertion.f0)
+ for (ValidationParameters validationParameter : validationParameters) {
+ SingleOutputStreamOperator<Integer> stream =
+ env.fromData(validationParameter.getState())
.keyBy(v -> v)
- .map(new StateReader())
- .uid(assertion.f1)
- .collectAsync());
+ .map(new StateReader());
+ if (validationParameter.getUid() != null) {
+
iterators.add(stream.uid(validationParameter.getUid()).collectAsync());
+ } else {
+
iterators.add(stream.setUidHash(validationParameter.getUidHash()).collectAsync());
+ }
}
// run job
@@ -208,10 +274,10 @@ public class SavepointWriterUidModificationITCase {
env.executeAsync(streamGraph);
// validate state
- for (int i = 0; i < assertions.length; i++) {
+ for (int i = 0; i < validationParameters.length; i++) {
assertThat(iterators.get(i))
.toIterable()
- .containsExactlyInAnyOrderElementsOf(assertions[i].f0);
+
.containsExactlyInAnyOrderElementsOf(validationParameters[i].getState());
}
for (CloseableIterator<Integer> iterator : iterators) {
@@ -219,6 +285,36 @@ public class SavepointWriterUidModificationITCase {
}
}
+ private static class ValidationParameters {
+ private final Collection<Integer> state;
+ private final String uid;
+ private final String uidHash;
+
+ public ValidationParameters(
+ final Collection<Integer> state, final String uid, final
String uidHash) {
+ this.state = state;
+ this.uid = uid;
+ this.uidHash = uidHash;
+ }
+
+ public Collection<Integer> getState() {
+ return state;
+ }
+
+ public String getUid() {
+ return uid;
+ }
+
+ public String getUidHash() {
+ return uidHash;
+ }
+
+ public static ValidationParameters of(
+ final Collection<Integer> state, final String uid, final
String uidHash) {
+ return new ValidationParameters(state, uid, uidHash);
+ }
+ }
+
/** A savepoint writer function. */
public static class StateBootstrapper extends
KeyedStateBootstrapFunction<Integer, Integer> {
private transient ValueState<Integer> state;
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
index 776ab093db3..5928c809c6d 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/StateBootstrapTransformationTest.java
@@ -28,7 +28,6 @@ import
org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
-import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -54,7 +53,7 @@ public class StateBootstrapTransformationTest extends
AbstractTestBaseJUnit4 {
int maxParallelism = transformation.getMaxParallelism(4);
DataStream<TaggedOperatorSubtaskState> result =
transformation.writeOperatorSubtaskStates(
- OperatorIDGenerator.fromUid("uid"),
+ OperatorIdentifier.forUid("uid"),
new HashMapStateBackend(),
new Path(),
maxParallelism);
@@ -79,7 +78,7 @@ public class StateBootstrapTransformationTest extends
AbstractTestBaseJUnit4 {
int maxParallelism = transformation.getMaxParallelism(10);
DataStream<TaggedOperatorSubtaskState> result =
transformation.writeOperatorSubtaskStates(
- OperatorIDGenerator.fromUid("uid"),
+ OperatorIdentifier.forUid("uid"),
new HashMapStateBackend(),
new Path(),
maxParallelism);
@@ -104,7 +103,7 @@ public class StateBootstrapTransformationTest extends
AbstractTestBaseJUnit4 {
int maxParallelism = transformation.getMaxParallelism(4);
DataStream<TaggedOperatorSubtaskState> result =
transformation.writeOperatorSubtaskStates(
- OperatorIDGenerator.fromUid("uid"),
+ OperatorIdentifier.forUid("uid"),
new HashMapStateBackend(),
new Path(),
maxParallelism);
@@ -130,7 +129,7 @@ public class StateBootstrapTransformationTest extends
AbstractTestBaseJUnit4 {
int maxParallelism = transformation.getMaxParallelism(4);
DataStream<TaggedOperatorSubtaskState> result =
transformation.writeOperatorSubtaskStates(
- OperatorIDGenerator.fromUid("uid"),
+ OperatorIdentifier.forUid("uid"),
new HashMapStateBackend(),
new Path(),
maxParallelism);
@@ -153,7 +152,7 @@ public class StateBootstrapTransformationTest extends
AbstractTestBaseJUnit4 {
StreamConfig config =
transformation.getConfig(
- OperatorIDGenerator.fromUid("uid"),
+ OperatorIdentifier.forUid("uid"),
new HashMapStateBackend(),
new Configuration(),
null);
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
index 83ec5f4cbaf..62933ef7f7a 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/BroadcastStateInputFormatTest.java
@@ -57,7 +57,8 @@ public class BroadcastStateInputFormatTest {
testHarness.processElement2(new StreamRecord<>(3));
OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
- OperatorState state = new
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+ OperatorState state =
+ new OperatorState(null, null,
OperatorIDGenerator.fromUid("uid"), 1, 4);
state.putState(0, subtaskState);
OperatorStateInputSplit split =
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
index ee129d00fc0..c287cdb5598 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
@@ -66,7 +66,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(new StreamFlatMap<>(new
StatefulFunction()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -87,7 +87,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(new StreamFlatMap<>(new
StatefulFunction()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -110,7 +110,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(new StreamFlatMap<>(new
StatefulFunction()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -135,7 +135,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(new StreamFlatMap<>(new
StatefulFunction()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -161,7 +161,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(new StreamFlatMap<>(new
StatefulFunction()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -187,7 +187,7 @@ public class KeyedStateInputFormatTest {
OperatorSubtaskState state =
createOperatorSubtaskState(
new KeyedProcessOperator<>(new
StatefulFunctionWithTime()));
- OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 128);
operatorState.putState(0, state);
KeyedStateInputFormat<?, ?, ?> format =
@@ -213,7 +213,7 @@ public class KeyedStateInputFormatTest {
throws IOException {
KeyedStateInputFormat<Integer, VoidNamespace, Integer> format =
new KeyedStateInputFormat<>(
- new OperatorState(OperatorIDGenerator.fromUid("uid"),
1, 4),
+ new OperatorState(null, null,
OperatorIDGenerator.fromUid("uid"), 1, 4),
new MemoryStateBackend(),
new Configuration(),
new KeyedStateReaderOperator<>(userFunction,
Types.INT),
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
index f9b0d273448..6e837d80997 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/ListStateInputFormatTest.java
@@ -59,7 +59,8 @@ public class ListStateInputFormatTest {
testHarness.processElement(3, 0);
OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
- OperatorState state = new
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+ OperatorState state =
+ new OperatorState(null, null,
OperatorIDGenerator.fromUid("uid"), 1, 4);
state.putState(0, subtaskState);
OperatorStateInputSplit split =
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
index 939d0f5a7df..93e6a05464d 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/StreamOperatorContextBuilderTest.java
@@ -50,7 +50,7 @@ public class StreamOperatorContextBuilderTest {
new StreamOperatorContextBuilder(
context,
configuration,
- new OperatorState(new OperatorID(), 1, 128),
+ new OperatorState(null, null, new OperatorID(), 1,
128),
new PrioritizedOperatorSubtaskStateInputSplit() {
@Override
public PrioritizedOperatorSubtaskState
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
index 828c787deda..215d2c40325 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/UnionStateInputFormatTest.java
@@ -60,7 +60,8 @@ public class UnionStateInputFormatTest {
testHarness.processElement(3, 0);
OperatorSubtaskState subtaskState = testHarness.snapshot(0, 0);
- OperatorState state = new
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
+ OperatorState state =
+ new OperatorState(null, null,
OperatorIDGenerator.fromUid("uid"), 1, 4);
state.putState(0, subtaskState);
OperatorStateInputSplit split =
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
index b465b024880..355f28aa532 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/WindowReaderTest.java
@@ -248,7 +248,7 @@ public class WindowReaderTest {
harness.close();
OperatorID operatorID = OperatorIDGenerator.fromUid(UID);
- OperatorState operatorState = new OperatorState(operatorID, 1,
MAX_PARALLELISM);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, MAX_PARALLELISM);
operatorState.putState(0, state);
return operatorState;
}
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
index 51346556e68..bac5fddd70c 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -77,7 +77,8 @@ public class SavepointOutputFormatTest {
}
private CheckpointMetadata createSavepoint() {
- OperatorState operatorState = new
OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 128);
+ OperatorState operatorState =
+ new OperatorState(null, null,
OperatorIDGenerator.fromUid("uid"), 1, 128);
operatorState.putState(0, OperatorSubtaskState.builder().build());
return new CheckpointMetadata(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
index 498d68d8e8e..7f3a042de33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
@@ -31,28 +31,56 @@ public class OperatorIDPair implements Serializable {
private static final long serialVersionUID = 1L;
private final OperatorID generatedOperatorID;
- private final OperatorID userDefinedOperatorID;
+ @Nullable private final OperatorID userDefinedOperatorID;
+ @Nullable private final String userDefinedOperatorName;
+ @Nullable private final String userDefinedOperatorUid;
private OperatorIDPair(
- OperatorID generatedOperatorID, @Nullable OperatorID
userDefinedOperatorID) {
+ OperatorID generatedOperatorID,
+ @Nullable OperatorID userDefinedOperatorID,
+ @Nullable String userDefinedOperatorName,
+ @Nullable String userDefinedOperatorUid) {
this.generatedOperatorID = generatedOperatorID;
this.userDefinedOperatorID = userDefinedOperatorID;
+ if (userDefinedOperatorName != null &&
userDefinedOperatorName.isEmpty()) {
+ throw new IllegalArgumentException("Empty string operator name is
not allowed");
+ }
+ this.userDefinedOperatorName = userDefinedOperatorName;
+ if (userDefinedOperatorUid != null &&
userDefinedOperatorUid.isEmpty()) {
+ throw new IllegalArgumentException("Empty string operator uid is
not allowed");
+ }
+ this.userDefinedOperatorUid = userDefinedOperatorUid;
}
public static OperatorIDPair of(
- OperatorID generatedOperatorID, @Nullable OperatorID
userDefinedOperatorID) {
- return new OperatorIDPair(generatedOperatorID, userDefinedOperatorID);
+ OperatorID generatedOperatorID,
+ @Nullable OperatorID userDefinedOperatorID,
+ @Nullable String operatorName,
+ @Nullable String operatorUid) {
+ return new OperatorIDPair(
+ generatedOperatorID, userDefinedOperatorID, operatorName,
operatorUid);
}
public static OperatorIDPair generatedIDOnly(OperatorID
generatedOperatorID) {
- return new OperatorIDPair(generatedOperatorID, null);
+ return new OperatorIDPair(generatedOperatorID, null, null, null);
}
public OperatorID getGeneratedOperatorID() {
return generatedOperatorID;
}
+ @Nullable
public Optional<OperatorID> getUserDefinedOperatorID() {
return Optional.ofNullable(userDefinedOperatorID);
}
+
+ @Nullable
+ public String getUserDefinedOperatorName() {
+ return userDefinedOperatorName;
+ }
+
+ @Nullable
+ public String getUserDefinedOperatorUid() {
+ return userDefinedOperatorUid;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index f445727e6ef..682e9fdcea1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
-import org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer;
+import org.apache.flink.runtime.checkpoint.metadata.MetadataV5Serializer;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -85,7 +85,7 @@ public class Checkpoints {
public static void storeCheckpointMetadata(
CheckpointMetadata checkpointMetadata, DataOutputStream out)
throws IOException {
- storeCheckpointMetadata(checkpointMetadata, out,
MetadataV4Serializer.INSTANCE);
+ storeCheckpointMetadata(checkpointMetadata, out,
MetadataV5Serializer.INSTANCE);
}
public static void storeCheckpointMetadata(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
index 9253799a17a..44e81b3de7a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.java
@@ -228,19 +228,21 @@ public class DefaultCheckpointPlan implements
CheckpointPlan {
Map<OperatorID, OperatorState> operatorStates) {
// Completes the operator state for the fully finished operators
for (ExecutionJobVertex jobVertex :
fullyFinishedOrFinishedOnRestoreVertices.values()) {
- for (OperatorIDPair operatorID : jobVertex.getOperatorIDs()) {
+ for (OperatorIDPair operatorIDPair : jobVertex.getOperatorIDs()) {
OperatorState operatorState =
-
operatorStates.get(operatorID.getGeneratedOperatorID());
+
operatorStates.get(operatorIDPair.getGeneratedOperatorID());
checkState(
operatorState == null ||
!operatorState.hasSubtaskStates(),
"There should be no states or only coordinator state
reported for fully finished operators");
operatorState =
new FullyFinishedOperatorState(
- operatorID.getGeneratedOperatorID(),
+ operatorIDPair.getUserDefinedOperatorName(),
+ operatorIDPair.getUserDefinedOperatorUid(),
+ operatorIDPair.getGeneratedOperatorID(),
jobVertex.getParallelism(),
jobVertex.getMaxParallelism());
- operatorStates.put(operatorID.getGeneratedOperatorID(),
operatorState);
+ operatorStates.put(operatorIDPair.getGeneratedOperatorID(),
operatorState);
}
}
}
@@ -260,6 +262,8 @@ public class DefaultCheckpointPlan implements
CheckpointPlan {
if (operatorState == null) {
operatorState =
new OperatorState(
+
operatorIDPair.getUserDefinedOperatorName(),
+ operatorIDPair.getUserDefinedOperatorUid(),
operatorIDPair.getGeneratedOperatorID(),
jobVertex.getParallelism(),
jobVertex.getMaxParallelism());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
index 1726ec93c15..cd405a6cd2b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorState.java
@@ -31,8 +31,13 @@ public class FullyFinishedOperatorState extends
OperatorState {
private static final long serialVersionUID = 1L;
- public FullyFinishedOperatorState(OperatorID operatorID, int parallelism,
int maxParallelism) {
- super(operatorID, parallelism, maxParallelism);
+ public FullyFinishedOperatorState(
+ @Nullable String operatorName,
+ @Nullable String operatorUid,
+ OperatorID operatorID,
+ int parallelism,
+ int maxParallelism) {
+ super(operatorName, operatorUid, operatorID, parallelism,
maxParallelism);
}
@Override
@@ -55,7 +60,11 @@ public class FullyFinishedOperatorState extends
OperatorState {
@Override
public OperatorState copyAndDiscardInFlightData() {
return new FullyFinishedOperatorState(
- getOperatorID(), getParallelism(), getMaxParallelism());
+ getOperatorName().orElse(null),
+ getOperatorUid().orElse(null),
+ getOperatorID(),
+ getParallelism(),
+ getMaxParallelism());
}
@Override
@@ -75,6 +84,10 @@ public class FullyFinishedOperatorState extends
OperatorState {
@Override
public String toString() {
return "FullyFinishedOperatorState("
+ + "name: "
+ + getOperatorName()
+ + "uid: "
+ + getOperatorUid()
+ "operatorID: "
+ getOperatorID()
+ ", parallelism: "
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index d377ca5ec7b..1707ed93800 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -47,6 +48,12 @@ public class OperatorState implements CompositeStateHandle {
private static final long serialVersionUID = -4845578005863201810L;
+ /** The name of the operator. */
+ @Nullable private String operatorName;
+
+ /** The Uid of the operator. */
+ @Nullable private String operatorUid;
+
/** The id of the operator. */
private final OperatorID operatorID;
@@ -65,7 +72,12 @@ public class OperatorState implements CompositeStateHandle {
*/
private final int maxParallelism;
- public OperatorState(OperatorID operatorID, int parallelism, int
maxParallelism) {
+ public OperatorState(
+ @Nullable String operatorName,
+ @Nullable String operatorUid,
+ OperatorID operatorID,
+ int parallelism,
+ int maxParallelism) {
if (parallelism > maxParallelism) {
throw new IllegalArgumentException(
String.format(
@@ -73,6 +85,8 @@ public class OperatorState implements CompositeStateHandle {
parallelism, maxParallelism));
}
+ this.operatorName = operatorName;
+ this.operatorUid = operatorUid;
this.operatorID = operatorID;
this.operatorSubtaskStates =
CollectionUtil.newHashMapWithExpectedSize(parallelism);
@@ -81,6 +95,22 @@ public class OperatorState implements CompositeStateHandle {
this.maxParallelism = maxParallelism;
}
+ public Optional<String> getOperatorName() {
+ return Optional.ofNullable(operatorName);
+ }
+
+ public void setOperatorName(String operatorName) {
+ this.operatorName = operatorName;
+ }
+
+ public Optional<String> getOperatorUid() {
+ return Optional.ofNullable(operatorUid);
+ }
+
+ public void setOperatorUid(String operatorUid) {
+ this.operatorUid = operatorUid;
+ }
+
public OperatorID getOperatorID() {
return operatorID;
}
@@ -145,14 +175,18 @@ public class OperatorState implements
CompositeStateHandle {
return maxParallelism;
}
- public OperatorState copyWithNewOperatorID(OperatorID newOperatorId) {
- OperatorState newState = new OperatorState(newOperatorId, parallelism,
maxParallelism);
+ public OperatorState copyWithNewIDs(@Nullable String newOperatorUid,
OperatorID newOperatorId) {
+ OperatorState newState =
+ new OperatorState(
+ operatorName, newOperatorUid, newOperatorId,
parallelism, maxParallelism);
operatorSubtaskStates.forEach(newState::putState);
return newState;
}
public OperatorState copyAndDiscardInFlightData() {
- OperatorState newState = new OperatorState(operatorID, parallelism,
maxParallelism);
+ OperatorState newState =
+ new OperatorState(
+ operatorName, operatorUid, operatorID, parallelism,
maxParallelism);
for (Map.Entry<Integer, OperatorSubtaskState>
originalSubtaskStateEntry :
operatorSubtaskStates.entrySet()) {
@@ -256,7 +290,11 @@ public class OperatorState implements CompositeStateHandle
{
// KvStates are always null in 1.1. Don't print this as it might
// confuse users that don't care about how we store it internally.
return "OperatorState("
- + "operatorID: "
+ + "name: "
+ + getOperatorName()
+ + ", uid: "
+ + getOperatorUid()
+ + ", operatorID: "
+ operatorID
+ ", parallelism: "
+ parallelism
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2f2cc6f6f67..7712d905d03 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -407,9 +407,8 @@ public class PendingCheckpoint implements Checkpoint {
if (operatorSubtaskStates != null &&
operatorSubtaskStates.isTaskDeployedAsFinished()) {
checkpointPlan.reportTaskFinishedOnRestore(vertex);
} else {
- List<OperatorIDPair> operatorIDs =
vertex.getJobVertex().getOperatorIDs();
- for (OperatorIDPair operatorID : operatorIDs) {
- updateOperatorState(vertex, operatorSubtaskStates,
operatorID);
+ for (OperatorIDPair operatorIDPair :
vertex.getJobVertex().getOperatorIDs()) {
+ updateOperatorState(vertex, operatorSubtaskStates,
operatorIDPair);
}
if (operatorSubtaskStates != null &&
operatorSubtaskStates.isTaskFinished()) {
@@ -464,22 +463,27 @@ public class PendingCheckpoint implements Checkpoint {
private void updateOperatorState(
ExecutionVertex vertex,
TaskStateSnapshot operatorSubtaskStates,
- OperatorIDPair operatorID) {
- OperatorState operatorState =
operatorStates.get(operatorID.getGeneratedOperatorID());
+ OperatorIDPair operatorIDPair) {
+ OperatorState operatorState =
operatorStates.get(operatorIDPair.getGeneratedOperatorID());
if (operatorState == null) {
operatorState =
new OperatorState(
- operatorID.getGeneratedOperatorID(),
+ operatorIDPair.getUserDefinedOperatorName(),
+ operatorIDPair.getUserDefinedOperatorUid(),
+ operatorIDPair.getGeneratedOperatorID(),
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
- operatorStates.put(operatorID.getGeneratedOperatorID(),
operatorState);
+ operatorStates.put(operatorIDPair.getGeneratedOperatorID(),
operatorState);
+ } else {
+
operatorState.setOperatorName(operatorIDPair.getUserDefinedOperatorName());
+
operatorState.setOperatorUid(operatorIDPair.getUserDefinedOperatorUid());
}
OperatorSubtaskState operatorSubtaskState =
operatorSubtaskStates == null
? null
: operatorSubtaskStates.getSubtaskStateByOperatorID(
- operatorID.getGeneratedOperatorID());
+ operatorIDPair.getGeneratedOperatorID());
if (operatorSubtaskState != null) {
operatorState.putState(vertex.getParallelSubtaskIndex(),
operatorSubtaskState);
@@ -507,6 +511,8 @@ public class PendingCheckpoint implements Checkpoint {
if (operatorState == null) {
operatorState =
new OperatorState(
+ null,
+ null,
operatorId,
coordinatorInfo.currentParallelism(),
coordinatorInfo.maxParallelism());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 07088d901f6..917e81f6d49 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -119,6 +119,8 @@ public class StateAssignmentOperation {
if (operatorState == null) {
operatorState =
new OperatorState(
+
operatorIDPair.getUserDefinedOperatorName(),
+ operatorIDPair.getUserDefinedOperatorUid(),
operatorID,
executionJobVertex.getParallelism(),
executionJobVertex.getMaxParallelism());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
index ee370b009b9..8646c48e1cf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataSerializers.java
@@ -30,13 +30,14 @@ import java.util.Map;
public class MetadataSerializers {
private static final Map<Integer, MetadataSerializer> SERIALIZERS =
- CollectionUtil.newHashMapWithExpectedSize(4);
+ CollectionUtil.newHashMapWithExpectedSize(5);
static {
registerSerializer(MetadataV1Serializer.INSTANCE);
registerSerializer(MetadataV2Serializer.INSTANCE);
registerSerializer(MetadataV3Serializer.INSTANCE);
registerSerializer(MetadataV4Serializer.INSTANCE);
+ registerSerializer(MetadataV5Serializer.INSTANCE);
}
private static void registerSerializer(MetadataSerializer serializer) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
index 49f6f3a3e2e..c993acb1c4d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Serializer.java
@@ -118,7 +118,8 @@ public class MetadataV2Serializer extends
MetadataV2V3SerializerBase implements
dis.readInt();
// Add task state
- final OperatorState taskState = new OperatorState(jobVertexId,
parallelism, maxParallelism);
+ final OperatorState taskState =
+ new OperatorState(null, null, jobVertexId, parallelism,
maxParallelism);
// Sub task states
final int numSubTaskStates = dis.readInt();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index 83b5201e5ec..fc6df97ce8d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -66,7 +66,7 @@ public class MetadataV3Serializer extends
MetadataV2V3SerializerBase implements
new ChannelStateHandleSerializer();
/** Singleton, not meant to be instantiated. */
- private MetadataV3Serializer() {}
+ protected MetadataV3Serializer() {}
@Override
public int getVersion() {
@@ -80,7 +80,7 @@ public class MetadataV3Serializer extends
MetadataV2V3SerializerBase implements
@Override
public void serialize(CheckpointMetadata checkpointMetadata,
DataOutputStream dos)
throws IOException {
- INSTANCE.serializeMetadata(checkpointMetadata, dos);
+ serializeMetadata(checkpointMetadata, dos);
}
@Override
@@ -163,11 +163,12 @@ public class MetadataV3Serializer extends
MetadataV2V3SerializerBase implements
checkState(
coordinateState == null,
"Coordinator State should be null for fully finished
operator state");
- return new FullyFinishedOperatorState(jobVertexId, parallelism,
maxParallelism);
+ return new FullyFinishedOperatorState(
+ null, null, jobVertexId, parallelism, maxParallelism);
}
final OperatorState operatorState =
- new OperatorState(jobVertexId, parallelism, maxParallelism);
+ new OperatorState(null, null, jobVertexId, parallelism,
maxParallelism);
// Coordinator state
operatorState.setCoordinatorState(coordinateState);
@@ -189,7 +190,7 @@ public class MetadataV3Serializer extends
MetadataV2V3SerializerBase implements
return operatorState;
}
- private SubtaskAndFinishedState
deserializeSubtaskIndexAndFinishedState(DataInputStream dis)
+ protected SubtaskAndFinishedState
deserializeSubtaskIndexAndFinishedState(DataInputStream dis)
throws IOException {
int storedSubtaskIndex = dis.readInt();
if (storedSubtaskIndex < 0) {
@@ -306,7 +307,7 @@ public class MetadataV3Serializer extends
MetadataV2V3SerializerBase implements
return INSTANCE.deserializeResultSubpartitionStateHandle(dis, null);
}
- private static class SubtaskAndFinishedState {
+ static class SubtaskAndFinishedState {
final int subtaskIndex;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
index 758e03a7ecb..88262fa805a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV4Serializer.java
@@ -31,7 +31,7 @@ import java.io.ObjectOutputStream;
* serialization.
*/
@Internal
-public class MetadataV4Serializer implements MetadataSerializer {
+public class MetadataV4Serializer extends MetadataV3Serializer {
public static final MetadataSerializer INSTANCE = new
MetadataV4Serializer();
public static final int VERSION = 4;
@@ -45,15 +45,14 @@ public class MetadataV4Serializer implements
MetadataSerializer {
public CheckpointMetadata deserialize(
DataInputStream dis, ClassLoader userCodeClassLoader, String
externalPointer)
throws IOException {
- return MetadataV3Serializer.INSTANCE
- .deserialize(dis, userCodeClassLoader, externalPointer)
+ return super.deserialize(dis, userCodeClassLoader, externalPointer)
.withProperties(deserializeProperties(dis));
}
@Override
public void serialize(CheckpointMetadata checkpointMetadata,
DataOutputStream dos)
throws IOException {
- MetadataV3Serializer.INSTANCE.serialize(checkpointMetadata, dos);
+ super.serialize(checkpointMetadata, dos);
serializeProperties(checkpointMetadata.getCheckpointProperties(), dos);
}
@@ -66,7 +65,7 @@ public class MetadataV4Serializer implements
MetadataSerializer {
}
}
- private static void serializeProperties(CheckpointProperties properties,
DataOutputStream dos)
+ private void serializeProperties(CheckpointProperties properties,
DataOutputStream dos)
throws IOException {
new ObjectOutputStream(dos).writeObject(properties); // closed outside
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
new file mode 100644
index 00000000000..ca0d568ccf4
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5Serializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.metadata;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** V5 serializer that adds Operator name and uid. */
+@Internal
+public class MetadataV5Serializer extends MetadataV4Serializer {
+
+ public static final MetadataSerializer INSTANCE = new
MetadataV5Serializer();
+ public static final int VERSION = 5;
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected void serializeOperatorState(OperatorState operatorState,
DataOutputStream dos)
+ throws IOException {
+ if (operatorState.getOperatorName().isPresent()
+ && operatorState.getOperatorName().get().isEmpty()) {
+ throw new IllegalArgumentException("Empty string operator name is
not allowed");
+ }
+ if (operatorState.getOperatorUid().isPresent()
+ && operatorState.getOperatorUid().get().isEmpty()) {
+ throw new IllegalArgumentException("Empty string operator uid is
not allowed");
+ }
+ // Name and UID are null in the whole chain when not provided, but
since we can store
+ // strings in metadata we do conversion here
+ dos.writeUTF(operatorState.getOperatorName().orElse(""));
+ dos.writeUTF(operatorState.getOperatorUid().orElse(""));
+ super.serializeOperatorState(operatorState, dos);
+ }
+
+ @Override
+ protected OperatorState deserializeOperatorState(
+ DataInputStream dis, @Nullable DeserializationContext context)
throws IOException {
+ final String name = dis.readUTF();
+ final String uid = dis.readUTF();
+
+ final OperatorID jobVertexId = new OperatorID(dis.readLong(),
dis.readLong());
+ final int parallelism = dis.readInt();
+ final int maxParallelism = dis.readInt();
+
+ ByteStreamStateHandle coordinateState =
+ deserializeAndCheckByteStreamStateHandle(dis, context);
+
+ final int numSubTaskStates = dis.readInt();
+ if (numSubTaskStates < 0) {
+ checkState(
+ coordinateState == null,
+ "Coordinator State should be null for fully finished
operator state");
+ return new FullyFinishedOperatorState(
+ name.isEmpty() ? null : name,
+ uid.isEmpty() ? null : uid,
+ jobVertexId,
+ parallelism,
+ maxParallelism);
+ }
+
+ // Name and UID are null in the whole chain when not provided, but
since we can store
+ // strings in metadata we do conversion here
+ final OperatorState operatorState =
+ new OperatorState(
+ name.isEmpty() ? null : name,
+ uid.isEmpty() ? null : uid,
+ jobVertexId,
+ parallelism,
+ maxParallelism);
+
+ // Coordinator state
+ operatorState.setCoordinatorState(coordinateState);
+
+ // Sub task states
+ for (int j = 0; j < numSubTaskStates; j++) {
+ SubtaskAndFinishedState subtaskAndFinishedState =
+ deserializeSubtaskIndexAndFinishedState(dis);
+ if (subtaskAndFinishedState.isFinished) {
+ operatorState.putState(
+ subtaskAndFinishedState.subtaskIndex,
+ FinishedOperatorSubtaskState.INSTANCE);
+ } else {
+ final OperatorSubtaskState subtaskState =
deserializeSubtaskState(dis, context);
+ operatorState.putState(subtaskAndFinishedState.subtaskIndex,
subtaskState);
+ }
+ }
+
+ return operatorState;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 358c62d8610..7fd640c49cc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -984,16 +984,21 @@ public class StreamingJobGraphGenerator {
JobVertexID jobVertexId = new JobVertexID(hash);
- List<Tuple2<byte[], byte[]>> chainedOperators =
+ List<ChainedOperatorHashInfo> chainedOperators =
chainInfo.getChainedOperatorHashes(streamNodeId);
List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
if (chainedOperators != null) {
- for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
+ for (ChainedOperatorHashInfo chainedOperator : chainedOperators) {
OperatorID userDefinedOperatorID =
- chainedOperator.f1 == null ? null : new
OperatorID(chainedOperator.f1);
+ chainedOperator.getUserDefinedOperatorID() == null
+ ? null
+ : new
OperatorID(chainedOperator.getUserDefinedOperatorID());
operatorIDPairs.add(
OperatorIDPair.of(
- new OperatorID(chainedOperator.f0),
userDefinedOperatorID));
+ new
OperatorID(chainedOperator.getGeneratedOperatorID()),
+ userDefinedOperatorID,
+
chainedOperator.getStreamNode().getOperatorName(),
+
chainedOperator.getStreamNode().getTransformationUID()));
}
}
@@ -2109,7 +2114,7 @@ public class StreamingJobGraphGenerator {
private final Integer startNodeId;
private final Map<Integer, byte[]> hashes;
private final List<Map<Integer, byte[]>> legacyHashes;
- private final Map<Integer, List<Tuple2<byte[], byte[]>>>
chainedOperatorHashes;
+ private final Map<Integer, List<ChainedOperatorHashInfo>>
chainedOperatorHashes;
private final Map<Integer, ChainedSourceInfo> chainedSources;
private final List<OperatorCoordinator.Provider> coordinatorProviders;
private final StreamGraph streamGraph;
@@ -2141,7 +2146,7 @@ public class StreamingJobGraphGenerator {
return startNodeId;
}
- private List<Tuple2<byte[], byte[]>> getChainedOperatorHashes(int
startNodeId) {
+ private List<ChainedOperatorHashInfo> getChainedOperatorHashes(int
startNodeId) {
return chainedOperatorHashes.get(startNodeId);
}
@@ -2161,13 +2166,15 @@ public class StreamingJobGraphGenerator {
recordChainedNode(currentNodeId);
StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
- List<Tuple2<byte[], byte[]>> operatorHashes =
+ List<ChainedOperatorHashInfo> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k ->
new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
- operatorHashes.add(new Tuple2<>(primaryHashBytes,
legacyHash.get(currentNodeId)));
+ operatorHashes.add(
+ new ChainedOperatorHashInfo(
+ primaryHashBytes,
legacyHash.get(currentNodeId), streamNode));
}
streamNode
@@ -2200,6 +2207,33 @@ public class StreamingJobGraphGenerator {
}
}
+ private static final class ChainedOperatorHashInfo {
+ private final byte[] generatedOperatorID;
+ private final byte[] userDefinedOperatorID;
+ private final StreamNode streamNode;
+
+ ChainedOperatorHashInfo(
+ final byte[] generatedOperatorID,
+ final byte[] userDefinedOperatorID,
+ final StreamNode streamNode) {
+ this.generatedOperatorID = generatedOperatorID;
+ this.userDefinedOperatorID = userDefinedOperatorID;
+ this.streamNode = streamNode;
+ }
+
+ public byte[] getGeneratedOperatorID() {
+ return generatedOperatorID;
+ }
+
+ public byte[] getUserDefinedOperatorID() {
+ return userDefinedOperatorID;
+ }
+
+ public StreamNode getStreamNode() {
+ return streamNode;
+ }
+ }
+
private static final class ChainedSourceInfo {
private final StreamConfig operatorConfig;
private final StreamConfig.SourceInputConfig inputConfig;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java
new file mode 100644
index 00000000000..db35952daa9
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/OperatorIDPairTest.java
@@ -0,0 +1,28 @@
+package org.apache.flink.runtime;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
+
+class OperatorIDPairTest {
+ @Test
+ void testEmptyNameShouldThrowException() {
+ final IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> OperatorIDPair.of(new OperatorID(), null, "",
null));
+ assertThat(exception.getMessage()).contains("Empty string operator
name is not allowed");
+ }
+
+ @Test
+ void testEmptyUidShouldThrowException() {
+ final IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> OperatorIDPair.of(new OperatorID(), null, null,
""));
+ assertThat(exception.getMessage()).contains("Empty string operator uid
is not allowed");
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 1dd9f2f2b69..83627510b1e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -661,7 +661,8 @@ class CheckpointCoordinatorRestoringTest {
// prepare vertex1 state
for (Tuple2<JobVertexID, OperatorID> id : Arrays.asList(id1, id2)) {
- OperatorState taskState = new OperatorState(id.f1, parallelism1,
maxParallelism1);
+ OperatorState taskState =
+ new OperatorState(null, null, id.f1, parallelism1,
maxParallelism1);
operatorStates.put(id.f1, taskState);
for (int index = 0; index < taskState.getParallelism(); index++) {
OperatorSubtaskState subtaskState =
@@ -681,7 +682,8 @@ class CheckpointCoordinatorRestoringTest {
new ArrayList<>();
// prepare vertex2 state
for (Tuple2<JobVertexID, OperatorID> id : Arrays.asList(id3, id4)) {
- OperatorState operatorState = new OperatorState(id.f1,
parallelism2, maxParallelism2);
+ OperatorState operatorState =
+ new OperatorState(null, null, id.f1, parallelism2,
maxParallelism2);
operatorStates.put(id.f1, operatorState);
List<ChainedStateHandle<OperatorStateHandle>>
expectedManagedOperatorState =
new ArrayList<>();
@@ -1085,7 +1087,7 @@ class CheckpointCoordinatorRestoringTest {
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
operatorStates.put(
op1.getGeneratedOperatorID(),
- new FullyFinishedOperatorState(op1.getGeneratedOperatorID(),
1, 1));
+ new FullyFinishedOperatorState(null, null,
op1.getGeneratedOperatorID(), 1, 1));
CompletedCheckpoint completedCheckpoint =
new CompletedCheckpoint(
graph.getJobID(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 92faa1438a6..212930044fa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -168,7 +168,7 @@ class CheckpointMetadataLoadingTest {
final int maxParallelism = 1234;
final OperatorState state =
- new OperatorState(operatorID, maxParallelism / 2,
maxParallelism);
+ new OperatorState(null, null, operatorID, maxParallelism / 2,
maxParallelism);
state.setCoordinatorState(new
ByteStreamStateHandle("coordinatorState", new byte[0]));
final CompletedCheckpointStorageLocation testSavepoint =
@@ -226,7 +226,8 @@ class CheckpointMetadataLoadingTest {
singleton(createNewResultSubpartitionStateHandle(10, rnd)))
.build();
- final OperatorState state = new OperatorState(operatorId, parallelism,
parallelism);
+ final OperatorState state =
+ new OperatorState(null, null, operatorId, parallelism,
parallelism);
state.putState(0, subtaskState);
return createSavepointWithOperatorState(checkpointId, state);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ebcfc0f5afe..4b52d1007f0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -232,7 +232,7 @@ class CheckpointStateRestoreTest {
// --- (2) Checkpoint misses state for a jobVertex (should work) ---
Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>();
{
- OperatorState taskState = new OperatorState(operatorId1, 3, 3);
+ OperatorState taskState = new OperatorState(null, null,
operatorId1, 3, 3);
taskState.putState(0, OperatorSubtaskState.builder().build());
taskState.putState(1, OperatorSubtaskState.builder().build());
taskState.putState(2, OperatorSubtaskState.builder().build());
@@ -264,7 +264,7 @@ class CheckpointStateRestoreTest {
// There is no task for this
{
- OperatorState taskState = new OperatorState(newOperatorID, 1, 1);
+ OperatorState taskState = new OperatorState(null, null,
newOperatorID, 1, 1);
taskState.putState(0, OperatorSubtaskState.builder().build());
checkpointTaskStates.put(newOperatorID, taskState);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 376f251a3d7..05a088e551b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -231,7 +231,8 @@ abstract class CompletedCheckpointStoreTest {
OperatorID operatorID = new OperatorID();
Map<OperatorID, OperatorState> operatorGroupState = new HashMap<>();
- OperatorState operatorState = new OperatorState(operatorID,
numberOfStates, numberOfStates);
+ OperatorState operatorState =
+ new OperatorState(null, null, operatorID, numberOfStates,
numberOfStates);
operatorGroupState.put(operatorID, operatorState);
for (int i = 0; i < numberOfStates; i++) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
index 2f08e0c36a6..3065a22b3f2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanTest.java
@@ -75,7 +75,7 @@ class DefaultCheckpointPlanTest {
CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
- OperatorState operatorState = new OperatorState(operatorId, 2, 2);
+ OperatorState operatorState = new OperatorState(null, null,
operatorId, 2, 2);
operatorState.putState(
0,
createSubtaskStateWithUnionListState(TempDirUtils.newFile(temporaryFolder)));
operatorStates.put(operatorId, operatorState);
@@ -109,7 +109,7 @@ class DefaultCheckpointPlanTest {
CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
- OperatorState operatorState = new OperatorState(operatorId, 2, 2);
+ OperatorState operatorState = new OperatorState(null, null,
operatorId, 2, 2);
operatorState.putState(
0,
createSubtaskStateWithUnionListState(TempDirUtils.newFile(temporaryFolder)));
@@ -210,7 +210,7 @@ class DefaultCheckpointPlanTest {
CheckpointPlan checkpointPlan = createCheckpointPlan(executionGraph);
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
- OperatorState operatorState = new OperatorState(finishedOperatorID, 1,
256);
+ OperatorState operatorState = new OperatorState(null, null,
finishedOperatorID, 1, 256);
operatorState.setCoordinatorState(new TestingStreamStateHandle());
operatorStates.put(finishedOperatorID, operatorState);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
index 3dd2ef9e436..5b49be69fe6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FullyFinishedOperatorStateTest.java
@@ -40,7 +40,8 @@ class FullyFinishedOperatorStateTest {
@Test
void testFullyFinishedOperatorState() {
- OperatorState operatorState = new FullyFinishedOperatorState(new
OperatorID(), 5, 256);
+ OperatorState operatorState =
+ new FullyFinishedOperatorState(null, null, new OperatorID(),
5, 256);
assertThat(operatorState.isFullyFinished()).isTrue();
assertThat(operatorState.getSubtaskStates()).isEmpty();
@@ -66,7 +67,7 @@ class FullyFinishedOperatorStateTest {
Tuple2<List<StateObject>, OperatorSubtaskState> opSubtaskStates2 =
generateSampleOperatorSubtaskState();
- OperatorState operatorState = new OperatorState(new OperatorID(), 2,
256);
+ OperatorState operatorState = new OperatorState(null, null, new
OperatorID(), 2, 256);
operatorState.putState(0, opSubtaskStates1.f1);
operatorState.putState(1, opSubtaskStates2.f1);
ByteStreamStateHandle coordinatorState =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index e530edef9b1..4dbc22dc34a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -225,7 +225,7 @@ class PendingCheckpointTest {
false, CheckpointType.CHECKPOINT, false, false, false,
false, false, false);
QueueExecutor executor = new QueueExecutor();
- OperatorState state = new OperatorState(new OperatorID(), 1, 256);
+ OperatorState state = new OperatorState(null, null, new OperatorID(),
1, 256);
OperatorSubtaskStateMock subtaskStateMock = new
OperatorSubtaskStateMock();
OperatorSubtaskState subtaskState = subtaskStateMock.getSubtaskState();
state.putState(0, subtaskState);
@@ -287,7 +287,7 @@ class PendingCheckpointTest {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.acknowledgeTask(ATTEMPT_ID, null,
mock(CheckpointMetrics.class));
final OperatorState expectedState =
- new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM);
+ new OperatorState(null, null, OPERATOR_ID, PARALLELISM,
MAX_PARALLELISM);
assertThat(Collections.singletonMap(OPERATOR_ID, expectedState))
.isEqualTo(pending.getOperatorStates());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index dc05fa91de9..fd08793e402 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -100,7 +100,7 @@ class StateAssignmentOperationTest {
@Test
void testRepartitionSplitDistributeStates() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(1);
metaInfoMap1.put(
@@ -130,7 +130,7 @@ class StateAssignmentOperationTest {
@Test
void testRepartitionUnionState() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(2);
metaInfoMap1.put(
@@ -164,7 +164,7 @@ class StateAssignmentOperationTest {
@Test
void testRepartitionBroadcastState() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(2);
metaInfoMap1.put(
@@ -202,7 +202,7 @@ class StateAssignmentOperationTest {
@Test
void testRepartitionBroadcastStateWithNullSubtaskState() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
// Only the subtask 0 reports the states.
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(2);
@@ -226,7 +226,7 @@ class StateAssignmentOperationTest {
@Test
void testRepartitionBroadcastStateWithEmptySubtaskState() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
// Only the subtask 0 reports the states.
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(2);
@@ -254,7 +254,7 @@ class StateAssignmentOperationTest {
@Test
void testReDistributeCombinedPartitionableStates() {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 2, 4);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 2, 4);
Map<String, OperatorStateHandle.StateMetaInfo> metaInfoMap1 = new
HashMap<>(6);
metaInfoMap1.put(
@@ -808,7 +808,7 @@ class StateAssignmentOperationTest {
List<OperatorID> operatorIds = buildOperatorIds(2);
Map<OperatorID, OperatorState> states = new HashMap<>();
Random random = new Random();
- OperatorState upstreamState = new OperatorState(operatorIds.get(0), 2,
MAX_P);
+ OperatorState upstreamState = new OperatorState(null, null,
operatorIds.get(0), 2, MAX_P);
OperatorSubtaskState state =
OperatorSubtaskState.builder()
.setResultSubpartitionState(
@@ -894,9 +894,9 @@ class StateAssignmentOperationTest {
List<OperatorID> operatorIds = buildOperatorIds(2);
Map<OperatorID, OperatorState> states = new HashMap<>();
OperatorState upstreamState =
- new OperatorState(operatorIds.get(0), upstreamParallelism,
MAX_P);
+ new OperatorState(null, null, operatorIds.get(0),
upstreamParallelism, MAX_P);
OperatorState downstreamState =
- new OperatorState(operatorIds.get(1), downstreamParallelism,
MAX_P);
+ new OperatorState(null, null, operatorIds.get(1),
downstreamParallelism, MAX_P);
states.put(operatorIds.get(0), upstreamState);
states.put(operatorIds.get(1), downstreamState);
@@ -971,7 +971,8 @@ class StateAssignmentOperationTest {
buildOperatorStates(Collections.singletonList(operatorIds.get(1)), 3);
// Create an operator state marked as finished
- OperatorState operatorState = new
FullyFinishedOperatorState(operatorIds.get(0), 3, 256);
+ OperatorState operatorState =
+ new FullyFinishedOperatorState(null, null, operatorIds.get(0),
3, 256);
states.put(operatorIds.get(0), operatorState);
Map<OperatorID, ExecutionJobVertex> vertices =
@@ -1043,7 +1044,7 @@ class StateAssignmentOperationTest {
@Test
void assigningStateHandlesCanNotBeNull() {
- OperatorState state = new OperatorState(new OperatorID(), 1, MAX_P);
+ OperatorState state = new OperatorState(null, null, new OperatorID(),
1, MAX_P);
List<KeyedStateHandle> managedKeyedStateHandles =
StateAssignmentOperation.getManagedKeyedStateHandles(state,
KeyGroupRange.of(0, 1));
@@ -1071,7 +1072,8 @@ class StateAssignmentOperationTest {
Function.identity(),
operatorID -> {
OperatorState state =
- new OperatorState(operatorID,
numSubTasks, MAX_P);
+ new OperatorState(
+ "", "", operatorID,
numSubTasks, MAX_P);
for (int i = 0; i < numSubTasks; i++) {
state.putState(
i,
@@ -1241,7 +1243,8 @@ class StateAssignmentOperationTest {
new JobVertex(
operatorID.toHexString(),
new JobVertexID(),
- singletonList(OperatorIDPair.of(operatorID,
userDefinedOperatorId)));
+ singletonList(
+ OperatorIDPair.of(operatorID,
userDefinedOperatorId, "", "")));
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(parallelism);
return jobVertex;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
index 0cf366a47c5..d21ca01f44e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/VertexFinishedStateCheckerTest.java
@@ -69,7 +69,7 @@ class VertexFinishedStateCheckerTest {
final JobVertexID jobVertexID1 = new JobVertexID();
final JobVertexID jobVertexID2 = new JobVertexID();
// The op1 has uidHash set.
- OperatorIDPair op1 = OperatorIDPair.of(new OperatorID(), new
OperatorID());
+ OperatorIDPair op1 = OperatorIDPair.of(new OperatorID(), new
OperatorID(), "", "");
OperatorIDPair op2 = OperatorIDPair.generatedIDOnly(new OperatorID());
OperatorIDPair op3 = OperatorIDPair.generatedIDOnly(new OperatorID());
@@ -82,10 +82,10 @@ class VertexFinishedStateCheckerTest {
Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
operatorStates.put(
useUidHash ? op1.getUserDefinedOperatorID().get() :
op1.getGeneratedOperatorID(),
- new FullyFinishedOperatorState(op1.getGeneratedOperatorID(),
1, 1));
+ new FullyFinishedOperatorState(null, null,
op1.getGeneratedOperatorID(), 1, 1));
operatorStates.put(
op2.getGeneratedOperatorID(),
- new OperatorState(op2.getGeneratedOperatorID(), 1, 1));
+ new OperatorState(null, null, op2.getGeneratedOperatorID(), 1,
1));
Set<ExecutionJobVertex> vertices = new HashSet<>();
vertices.add(graph.getJobVertex(jobVertexID1));
@@ -292,13 +292,13 @@ class VertexFinishedStateCheckerTest {
OperatorID operatorId,
VertexFinishedStateChecker.VertexFinishedState finishedState) {
switch (finishedState) {
case ALL_RUNNING:
- return new OperatorState(operatorId, 2, 2);
+ return new OperatorState(null, null, operatorId, 2, 2);
case PARTIALLY_FINISHED:
- OperatorState operatorState = new OperatorState(operatorId, 2,
2);
+ OperatorState operatorState = new OperatorState(null, null,
operatorId, 2, 2);
operatorState.putState(0,
FinishedOperatorSubtaskState.INSTANCE);
return operatorState;
case FULLY_FINISHED:
- return new FullyFinishedOperatorState(operatorId, 2, 2);
+ return new FullyFinishedOperatorState(null, null, operatorId,
2, 2);
default:
throw new UnsupportedOperationException(
"Not supported finished state: " + finishedState);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 63836bdc920..6cfb3a605cf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -97,7 +97,13 @@ public class CheckpointTestUtils {
+ numFullyFinishedTaskStates);
for (int stateIdx = 0; stateIdx < numAllRunningTaskStates; ++stateIdx)
{
- OperatorState taskState = new OperatorState(new OperatorID(),
numSubtasksPerTask, 128);
+ OperatorState taskState =
+ new OperatorState(
+ "operatorName-" + stateIdx,
+ "operatorUid-" + stateIdx,
+ new OperatorID(),
+ numSubtasksPerTask,
+ 128);
randomlySetCoordinatorState(taskState, random);
randomlySetSubtaskState(
taskState, IntStream.range(0,
numSubtasksPerTask).toArray(), random, basePath);
@@ -105,7 +111,8 @@ public class CheckpointTestUtils {
}
for (int stateIdx = 0; stateIdx < numPartlyFinishedTaskStates;
++stateIdx) {
- OperatorState taskState = new OperatorState(new OperatorID(),
numSubtasksPerTask, 128);
+ OperatorState taskState =
+ new OperatorState(null, null, new OperatorID(),
numSubtasksPerTask, 128);
randomlySetCoordinatorState(taskState, random);
randomlySetSubtaskState(
taskState,
@@ -122,7 +129,8 @@ public class CheckpointTestUtils {
for (int stateIdx = 0; stateIdx < numFullyFinishedTaskStates;
++stateIdx) {
taskStates.add(
- new FullyFinishedOperatorState(new OperatorID(),
numSubtasksPerTask, 128));
+ new FullyFinishedOperatorState(
+ null, null, new OperatorID(), numSubtasksPerTask,
128));
}
return taskStates;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
new file mode 100644
index 00000000000..f0a82a01124
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV5SerializerTest.java
@@ -0,0 +1,101 @@
+package org.apache.flink.runtime.checkpoint.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.runtime.checkpoint.OperatorState;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Random;
+
+import static java.util.Collections.emptyList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertThrows;
+
+/** {@link MetadataV5Serializer} test. */
+class MetadataV5SerializerTest {
+
+ private static final MetadataSerializer INSTANCE =
MetadataV5Serializer.INSTANCE;
+
+ private static final Random RND = new Random();
+
+ private Collection<OperatorState> taskStates;
+
+ private CheckpointMetadata metadata;
+
+ @BeforeEach
+ public void beforeEach(@TempDir Path tempDir) throws IOException {
+ taskStates =
+ CheckpointTestUtils.createOperatorStates(
+ RND, tempDir.toUri().toString(), 1, 0, 0, 0);
+ metadata = new CheckpointMetadata(1L, taskStates, emptyList(), null);
+ }
+
+ @Test
+ void testSerializeOperatorUidAndName() throws IOException {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out)) {
+ INSTANCE.serialize(metadata, dos);
+
+ try (DataInputStream dis =
+ new DataInputStream(new
ByteArrayInputStream(out.toByteArray()))) {
+ CheckpointMetadata deserializedMetadata =
+ INSTANCE.deserialize(dis,
metadata.getClass().getClassLoader(), "");
+ Collection<OperatorState> operatorStates =
deserializedMetadata.getOperatorStates();
+ assertThat(operatorStates).hasSize(1);
+ OperatorState operatorState = operatorStates.iterator().next();
+ assertThat(operatorState.getOperatorName()).isPresent();
+
assertThat(operatorState.getOperatorName().get()).isEqualTo("operatorName-0");
+ assertThat(operatorState.getOperatorUid()).isPresent();
+
assertThat(operatorState.getOperatorUid().get()).isEqualTo("operatorUid-0");
+ }
+ }
+ }
+
+ @Test
+ void testSerializeOperatorNameWithEmptyValue() throws IOException {
+ taskStates.iterator().next().setOperatorName("");
+ testSerializeOperatorWithEmptyValue("Empty string operator name is not
allowed");
+ }
+
+ @Test
+ void testSerializeOperatorUidWithEmptyValue() throws IOException {
+ taskStates.iterator().next().setOperatorUid("");
+ testSerializeOperatorWithEmptyValue("Empty string operator uid is not
allowed");
+ }
+
+ void testSerializeOperatorWithEmptyValue(String exceptionMessage) throws
IOException {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out)) {
+ final IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> INSTANCE.serialize(metadata, dos));
+ assertThat(exception.getMessage()).contains(exceptionMessage);
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
index b5054b0aeb7..ef486068c93 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java
@@ -69,7 +69,7 @@ public class TestUtils {
Collection<OperatorState> operatorStates = new
ArrayList<>(operatorIds.length);
for (OperatorID operatorId : operatorIds) {
- final OperatorState operatorState = new OperatorState(operatorId,
1, 42);
+ final OperatorState operatorState = new OperatorState(null, null,
operatorId, 1, 42);
final OperatorSubtaskState subtaskState =
OperatorSubtaskState.builder()
.setManagedOperatorState(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 5882da9656f..a1c5c3cabc5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -667,7 +667,8 @@ class OperatorCoordinatorSchedulerTest {
boolean restartAllOnFailover)
throws Exception {
- final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(),
provider.getOperatorId());
+ final OperatorIDPair opIds =
+ OperatorIDPair.of(new OperatorID(), provider.getOperatorId(),
"", "");
final JobVertex vertex =
new JobVertex(
"Vertex with OperatorCoordinator",
@@ -948,7 +949,7 @@ class OperatorCoordinatorSchedulerTest {
}
private static OperatorState createOperatorState(OperatorID id, byte[]
coordinatorState) {
- final OperatorState state = new OperatorState(id, 10, 16384);
+ final OperatorState state = new OperatorState(null, null, id, 10,
16384);
state.setCoordinatorState(new ByteStreamStateHandle("name",
coordinatorState));
return state;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 79113f7fb39..cffc163ed0a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -153,7 +153,7 @@ class SchedulerUtilsTest {
private CompletedCheckpoint buildCheckpoint(KeyedStateHandle incremental) {
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 1, 1);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 1);
operatorState.putState(
0,
OperatorSubtaskState.builder().setManagedKeyedState(incremental).build());
return new CompletedCheckpoint(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 59f2f6ebefe..f414ed03374 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -362,7 +362,7 @@ class SharedStateRegistryTest {
new StateHandleID(stateId));
OperatorID operatorID = new OperatorID();
- OperatorState operatorState = new OperatorState(operatorID, 1, 1);
+ OperatorState operatorState = new OperatorState(null, null,
operatorID, 1, 1);
operatorState.putState(
0,
OperatorSubtaskState.builder().setManagedKeyedState(initialHandle).build());