This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c89c41606fe [FLINK-37460][state] Make state processor API checkpoint 
ID configurable
c89c41606fe is described below

commit c89c41606fe609208f10d8fc08999145b612c6a2
Author: Gabor Somogyi <[email protected]>
AuthorDate: Fri Mar 14 08:11:04 2025 +0100

    [FLINK-37460][state] Make state processor API checkpoint ID configurable
---
 .../flink/state/api/KeyedStateTransformation.java  |  9 +++-
 .../state/api/OneInputStateTransformation.java     | 20 +++++---
 .../flink/state/api/OperatorTransformation.java    | 15 +++++-
 .../apache/flink/state/api/SavepointReader.java    | 10 +++-
 .../apache/flink/state/api/SavepointWriter.java    | 55 +++++++++++++++++++---
 .../state/api/WindowedStateTransformation.java     | 25 +++++++---
 .../state/api/output/MergeOperatorStates.java      |  7 ++-
 .../flink/state/api/output/SnapshotUtils.java      | 11 +++--
 .../operators/BroadcastStateBootstrapOperator.java | 11 ++++-
 .../operators/KeyedStateBootstrapOperator.java     |  9 +++-
 .../output/operators/StateBootstrapOperator.java   |  9 +++-
 .../operators/StateBootstrapWrapperOperator.java   |  9 +++-
 .../StateBootstrapWrapperOperatorFactory.java      | 11 ++++-
 .../api/runtime/metadata/SavepointMetadataV2.java  |  8 ++++
 .../flink/state/api/SavepointWriterITCase.java     | 18 +++++--
 .../output/KeyedStateBootstrapOperatorTest.java    |  4 +-
 .../flink/state/api/output/SnapshotUtilsTest.java  |  2 +-
 17 files changed, 188 insertions(+), 45 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
index 68228b96e45..b088afed2ef 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
@@ -44,6 +44,9 @@ public class KeyedStateTransformation<K, T> {
     /** The data set containing the data to bootstrap the operator state with. 
*/
     private final DataStream<T> stream;
 
+    /** Checkpoint ID. */
+    private final long checkpointId;
+
     /** Local max parallelism for the bootstrapped operator. */
     private final OptionalInt operatorMaxParallelism;
 
@@ -55,10 +58,12 @@ public class KeyedStateTransformation<K, T> {
 
     KeyedStateTransformation(
             DataStream<T> stream,
+            long checkpointId,
             OptionalInt operatorMaxParallelism,
             KeySelector<T, K> keySelector,
             TypeInformation<K> keyType) {
         this.stream = stream;
+        this.checkpointId = checkpointId;
         this.operatorMaxParallelism = operatorMaxParallelism;
         this.keySelector = keySelector;
         this.keyType = keyType;
@@ -80,7 +85,7 @@ public class KeyedStateTransformation<K, T> {
                 (timestamp, path) ->
                         SimpleOperatorFactory.of(
                                 new KeyedStateBootstrapOperator<>(
-                                        timestamp, path, processFunction));
+                                        checkpointId, timestamp, path, 
processFunction));
         return transform(factory);
     }
 
@@ -112,6 +117,6 @@ public class KeyedStateTransformation<K, T> {
     public <W extends Window> WindowedStateTransformation<T, K, W> window(
             WindowAssigner<? super T, W> assigner) {
         return new WindowedStateTransformation<>(
-                stream, operatorMaxParallelism, keySelector, keyType, 
assigner);
+                stream, checkpointId, operatorMaxParallelism, keySelector, 
keyType, assigner);
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
index a1e1191a148..8cc21a16390 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
@@ -49,11 +49,15 @@ public class OneInputStateTransformation<T> {
     /** The data stream containing the data to bootstrap the operator state 
with. */
     private final DataStream<T> stream;
 
+    /** Checkpoint ID. */
+    private final long checkpointId;
+
     /** Local max parallelism for the bootstrapped operator. */
     private OptionalInt operatorMaxParallelism = OptionalInt.empty();
 
-    OneInputStateTransformation(DataStream<T> stream) {
+    OneInputStateTransformation(DataStream<T> stream, long checkpointId) {
         this.stream = stream;
+        this.checkpointId = checkpointId;
     }
 
     /**
@@ -84,7 +88,8 @@ public class OneInputStateTransformation<T> {
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
                         SimpleOperatorFactory.of(
-                                new StateBootstrapOperator<>(timestamp, path, 
processFunction));
+                                new StateBootstrapOperator<>(
+                                        checkpointId, timestamp, path, 
processFunction));
 
         return transform(factory);
     }
@@ -105,7 +110,7 @@ public class OneInputStateTransformation<T> {
                 (timestamp, path) ->
                         SimpleOperatorFactory.of(
                                 new BroadcastStateBootstrapOperator<>(
-                                        timestamp, path, processFunction));
+                                        checkpointId, timestamp, path, 
processFunction));
 
         return transform(factory);
     }
@@ -133,7 +138,8 @@ public class OneInputStateTransformation<T> {
     public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K> 
keySelector) {
         TypeInformation<K> keyType =
                 TypeExtractor.getKeySelectorTypes(keySelector, 
stream.getType());
-        return new KeyedStateTransformation<>(stream, operatorMaxParallelism, 
keySelector, keyType);
+        return new KeyedStateTransformation<>(
+                stream, checkpointId, operatorMaxParallelism, keySelector, 
keyType);
     }
 
     /**
@@ -146,7 +152,8 @@ public class OneInputStateTransformation<T> {
      */
     public <K> KeyedStateTransformation<K, T> keyBy(
             KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
-        return new KeyedStateTransformation<>(stream, operatorMaxParallelism, 
keySelector, keyType);
+        return new KeyedStateTransformation<>(
+                stream, checkpointId, operatorMaxParallelism, keySelector, 
keyType);
     }
 
     /**
@@ -186,6 +193,7 @@ public class OneInputStateTransformation<T> {
 
         TypeInformation<Tuple> keyType =
                 TypeExtractor.getKeySelectorTypes(keySelector, 
stream.getType());
-        return new KeyedStateTransformation<>(stream, operatorMaxParallelism, 
keySelector, keyType);
+        return new KeyedStateTransformation<>(
+                stream, checkpointId, operatorMaxParallelism, keySelector, 
keyType);
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
index cca55a09321..29e8ea2cb16 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
@@ -64,6 +64,19 @@ public final class OperatorTransformation {
      * @return A {@link OneInputStateTransformation}.
      */
     public static <T> OneInputStateTransformation<T> 
bootstrapWith(DataStream<T> stream) {
-        return new OneInputStateTransformation<>(stream);
+        return new OneInputStateTransformation<>(stream, 0L);
+    }
+
+    /**
+     * Create a new {@link OneInputStateTransformation} from a {@link 
DataStream}.
+     *
+     * @param stream A data stream of elements.
+     * @param checkpointId checkpoint ID.
+     * @param <T> The type of the input.
+     * @return A {@link OneInputStateTransformation}.
+     */
+    public static <T> OneInputStateTransformation<T> bootstrapWith(
+            DataStream<T> stream, long checkpointId) {
+        return new OneInputStateTransformation<>(stream, checkpointId);
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
index ec4850e8f24..7c6bdf527d0 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
@@ -82,7 +82,10 @@ public class SavepointReader {
 
         SavepointMetadataV2 savepointMetadata =
                 new SavepointMetadataV2(
-                        maxParallelism, metadata.getMasterStates(), 
metadata.getOperatorStates());
+                        metadata.getCheckpointId(),
+                        maxParallelism,
+                        metadata.getMasterStates(),
+                        metadata.getOperatorStates());
         return new SavepointReader(env, savepointMetadata, null);
     }
 
@@ -111,7 +114,10 @@ public class SavepointReader {
 
         SavepointMetadataV2 savepointMetadata =
                 new SavepointMetadataV2(
-                        maxParallelism, metadata.getMasterStates(), 
metadata.getOperatorStates());
+                        metadata.getCheckpointId(),
+                        maxParallelism,
+                        metadata.getMasterStates(),
+                        metadata.getOperatorStates());
         return new SavepointReader(env, savepointMetadata, stateBackend);
     }
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index b8688da7169..8c9e32ecd1b 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -108,7 +108,10 @@ public class SavepointWriter {
                                                 "Savepoint must contain at 
least one operator state."));
 
         return new SavepointMetadataV2(
-                maxParallelism, metadata.getMasterStates(), 
metadata.getOperatorStates());
+                metadata.getCheckpointId(),
+                maxParallelism,
+                metadata.getMasterStates(),
+                metadata.getOperatorStates());
     }
 
     /**
@@ -123,13 +126,48 @@ public class SavepointWriter {
     public static SavepointWriter newSavepoint(
             StreamExecutionEnvironment executionEnvironment, int 
maxParallelism) {
         return new SavepointWriter(
-                createSavepointMetadata(maxParallelism), null, 
executionEnvironment);
+                createSavepointMetadata(0L, maxParallelism), null, 
executionEnvironment);
+    }
+
+    /**
+     * Creates a new savepoint. The savepoint will be written using the state 
backend defined via
+     * the clusters configuration.
+     *
+     * @param maxParallelism The max parallelism of the savepoint.
+     * @param checkpointId checkpoint ID.
+     * @return A {@link SavepointWriter}.
+     * @see #newSavepoint(StreamExecutionEnvironment, StateBackend, int)
+     * @see #withConfiguration(ConfigOption, Object)
+     */
+    public static SavepointWriter newSavepoint(
+            StreamExecutionEnvironment executionEnvironment,
+            long checkpointId,
+            int maxParallelism) {
+        return new SavepointWriter(
+                createSavepointMetadata(checkpointId, maxParallelism), null, 
executionEnvironment);
+    }
+
+    /**
+     * Creates a new savepoint.
+     *
+     * @param stateBackend The state backend of the savepoint used for keyed 
state.
+     * @param maxParallelism The max parallelism of the savepoint.
+     * @return A {@link SavepointWriter}.
+     * @see #newSavepoint(StreamExecutionEnvironment, int)
+     */
+    public static SavepointWriter newSavepoint(
+            StreamExecutionEnvironment executionEnvironment,
+            StateBackend stateBackend,
+            int maxParallelism) {
+        return new SavepointWriter(
+                createSavepointMetadata(0L, maxParallelism), stateBackend, 
executionEnvironment);
     }
 
     /**
      * Creates a new savepoint.
      *
      * @param stateBackend The state backend of the savepoint used for keyed 
state.
+     * @param checkpointId checkpoint ID.
      * @param maxParallelism The max parallelism of the savepoint.
      * @return A {@link SavepointWriter}.
      * @see #newSavepoint(StreamExecutionEnvironment, int)
@@ -137,12 +175,16 @@ public class SavepointWriter {
     public static SavepointWriter newSavepoint(
             StreamExecutionEnvironment executionEnvironment,
             StateBackend stateBackend,
+            long checkpointId,
             int maxParallelism) {
         return new SavepointWriter(
-                createSavepointMetadata(maxParallelism), stateBackend, 
executionEnvironment);
+                createSavepointMetadata(checkpointId, maxParallelism),
+                stateBackend,
+                executionEnvironment);
     }
 
-    private static SavepointMetadataV2 createSavepointMetadata(int 
maxParallelism) {
+    private static SavepointMetadataV2 createSavepointMetadata(
+            long checkpointId, int maxParallelism) {
         Preconditions.checkArgument(
                 maxParallelism > 0 && maxParallelism <= 
UPPER_BOUND_MAX_PARALLELISM,
                 "Maximum parallelism must be between 1 and "
@@ -151,7 +193,7 @@ public class SavepointWriter {
                         + maxParallelism);
 
         return new SavepointMetadataV2(
-                maxParallelism, Collections.emptyList(), 
Collections.emptyList());
+                checkpointId, maxParallelism, Collections.emptyList(), 
Collections.emptyList());
     }
 
     /**
@@ -295,7 +337,8 @@ public class SavepointWriter {
                         "reduce(OperatorState)",
                         TypeInformation.of(CheckpointMetadata.class),
                         new GroupReduceOperator<>(
-                                new 
MergeOperatorStates(metadata.getMasterStates())))
+                                new MergeOperatorStates(
+                                        metadata.getCheckpointId(), 
metadata.getMasterStates())))
                 .forceNonParallel()
                 .map(new 
CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap))
                 .setParallelism(1)
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
index d239d49a741..60e32b7babb 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
@@ -57,6 +57,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
     private final WindowOperatorBuilder<T, K, W> builder;
 
+    private final long checkpointId;
+
     private final OptionalInt operatorMaxParallelism;
 
     private final KeySelector<T, K> keySelector;
@@ -65,11 +67,13 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
     WindowedStateTransformation(
             DataStream<T> input,
+            long checkpointId,
             OptionalInt operatorMaxParallelism,
             KeySelector<T, K> keySelector,
             TypeInformation<K> keyType,
             WindowAssigner<? super T, W> windowAssigner) {
         this.input = input;
+        this.checkpointId = checkpointId;
         this.operatorMaxParallelism = operatorMaxParallelism;
         this.keySelector = keySelector;
         this.keyType = keyType;
@@ -158,7 +162,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -186,7 +191,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -323,7 +329,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -402,7 +409,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -428,7 +436,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -454,7 +463,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
@@ -477,7 +487,8 @@ public class WindowedStateTransformation<T, K, W extends 
Window> {
 
         SavepointWriterOperatorFactory factory =
                 (timestamp, path) ->
-                        new StateBootstrapWrapperOperatorFactory<>(timestamp, 
path, operator);
+                        new StateBootstrapWrapperOperatorFactory<>(
+                                checkpointId, timestamp, path, operator);
         return new StateBootstrapTransformation<>(
                 input, operatorMaxParallelism, factory, keySelector, keyType);
     }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
index f00bfa34943..584ae131fb7 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -39,11 +39,14 @@ public class MergeOperatorStates implements 
GroupReduceFunction<OperatorState, C
 
     private static final long serialVersionUID = 1L;
 
+    private final long checkpointId;
+
     private final Collection<MasterState> masterStates;
 
-    public MergeOperatorStates(Collection<MasterState> masterStates) {
+    public MergeOperatorStates(long checkpointId, Collection<MasterState> 
masterStates) {
         Preconditions.checkNotNull(masterStates, "Master state metadata must 
not be null");
 
+        this.checkpointId = checkpointId;
         this.masterStates = masterStates;
     }
 
@@ -51,7 +54,7 @@ public class MergeOperatorStates implements 
GroupReduceFunction<OperatorState, C
     public void reduce(Iterable<OperatorState> values, 
Collector<CheckpointMetadata> out) {
         CheckpointMetadata metadata =
                 new CheckpointMetadata(
-                        SnapshotUtils.CHECKPOINT_ID,
+                        checkpointId,
                         StreamSupport.stream(values.spliterator(), false)
                                 .collect(Collectors.toList()),
                         masterStates);
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
index b4b6e5dbd6a..2c7664234ab 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -41,11 +41,10 @@ import static 
org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFE
 /** Takes a final snapshot of the state of an operator subtask. */
 @Internal
 public final class SnapshotUtils {
-    static final long CHECKPOINT_ID = 0L;
-
     private SnapshotUtils() {}
 
     public static <OUT, OP extends StreamOperator<OUT>> 
TaggedOperatorSubtaskState snapshot(
+            long checkpointId,
             OP operator,
             int index,
             long timestamp,
@@ -64,21 +63,22 @@ public final class SnapshotUtils {
                         isUnalignedCheckpoint,
                         CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT);
 
-        operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
+        operator.prepareSnapshotPreBarrier(checkpointId);
 
         CheckpointStreamFactory storage = createStreamFactory(configuration, 
options);
 
         OperatorSnapshotFutures snapshotInProgress =
-                operator.snapshotState(CHECKPOINT_ID, timestamp, options, 
storage);
+                operator.snapshotState(checkpointId, timestamp, options, 
storage);
 
         OperatorSubtaskState state =
                 new 
OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
 
-        operator.notifyCheckpointComplete(CHECKPOINT_ID);
+        operator.notifyCheckpointComplete(checkpointId);
         return new TaggedOperatorSubtaskState(index, state);
     }
 
     public static <OUT, OP extends StreamOperator<OUT>> 
TaggedOperatorSubtaskState snapshot(
+            long checkpointId,
             OP operator,
             int index,
             long timestamp,
@@ -89,6 +89,7 @@ public final class SnapshotUtils {
             throws Exception {
 
         return snapshot(
+                checkpointId,
                 operator,
                 index,
                 timestamp,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
index 04aee7899c6..8dfb6d3e124 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -44,6 +44,8 @@ public class BroadcastStateBootstrapOperator<IN>
 
     private static final long serialVersionUID = 1L;
 
+    private final long checkpointId;
+
     private final long timestamp;
 
     private final Path savepointPath;
@@ -51,10 +53,14 @@ public class BroadcastStateBootstrapOperator<IN>
     private transient ContextImpl context;
 
     public BroadcastStateBootstrapOperator(
-            long timestamp, Path savepointPath, 
BroadcastStateBootstrapFunction<IN> function) {
+            long checkpointId,
+            long timestamp,
+            Path savepointPath,
+            BroadcastStateBootstrapFunction<IN> function) {
         super(function);
-        this.timestamp = timestamp;
 
+        this.checkpointId = checkpointId;
+        this.timestamp = timestamp;
         this.savepointPath = savepointPath;
     }
 
@@ -73,6 +79,7 @@ public class BroadcastStateBootstrapOperator<IN>
     public void endInput() throws Exception {
         TaggedOperatorSubtaskState state =
                 SnapshotUtils.snapshot(
+                        checkpointId,
                         this,
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
index b3be5b988d6..6f4b4a1c223 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -48,6 +48,8 @@ public class KeyedStateBootstrapOperator<K, IN>
 
     private static final long serialVersionUID = 1L;
 
+    private final long checkpointId;
+
     private final long timestamp;
 
     private final Path savepointPath;
@@ -55,9 +57,13 @@ public class KeyedStateBootstrapOperator<K, IN>
     private transient KeyedStateBootstrapOperator<K, IN>.ContextImpl context;
 
     public KeyedStateBootstrapOperator(
-            long timestamp, Path savepointPath, KeyedStateBootstrapFunction<K, 
IN> function) {
+            long checkpointId,
+            long timestamp,
+            Path savepointPath,
+            KeyedStateBootstrapFunction<K, IN> function) {
         super(function);
 
+        this.checkpointId = checkpointId;
         this.timestamp = timestamp;
         this.savepointPath = savepointPath;
     }
@@ -88,6 +94,7 @@ public class KeyedStateBootstrapOperator<K, IN>
     public void endInput() throws Exception {
         TaggedOperatorSubtaskState state =
                 SnapshotUtils.snapshot(
+                        checkpointId,
                         this,
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
index 4ae00d8e2d4..149aa938b02 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -40,6 +40,8 @@ public class StateBootstrapOperator<IN>
 
     private static final long serialVersionUID = 1L;
 
+    private final long checkpointId;
+
     private final long timestamp;
 
     private final Path savepointPath;
@@ -47,9 +49,13 @@ public class StateBootstrapOperator<IN>
     private transient ContextImpl context;
 
     public StateBootstrapOperator(
-            long timestamp, Path savepointPath, StateBootstrapFunction<IN> 
function) {
+            long checkpointId,
+            long timestamp,
+            Path savepointPath,
+            StateBootstrapFunction<IN> function) {
         super(function);
 
+        this.checkpointId = checkpointId;
         this.timestamp = timestamp;
         this.savepointPath = savepointPath;
     }
@@ -69,6 +75,7 @@ public class StateBootstrapOperator<IN>
     public void endInput() throws Exception {
         TaggedOperatorSubtaskState state =
                 SnapshotUtils.snapshot(
+                        checkpointId,
                         this,
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 7b91608b93a..4170c8f66f6 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -53,6 +53,8 @@ public final class StateBootstrapWrapperOperator<
 
     private static final long serialVersionUID = 1L;
 
+    private final long checkpointId;
+
     private final long timestamp;
 
     private final Path savepointPath;
@@ -62,7 +64,11 @@ public final class StateBootstrapWrapperOperator<
     private final WindowOperator<?, IN, ?, ?, ?> operator;
 
     public StateBootstrapWrapperOperator(
-            long timestamp, Path savepointPath, WindowOperator<?, IN, ?, ?, ?> 
operator) {
+            long checkpointId,
+            long timestamp,
+            Path savepointPath,
+            WindowOperator<?, IN, ?, ?, ?> operator) {
+        this.checkpointId = checkpointId;
         this.timestamp = timestamp;
         this.savepointPath = savepointPath;
         this.operator = operator;
@@ -172,6 +178,7 @@ public final class StateBootstrapWrapperOperator<
     public void endInput() throws Exception {
         TaggedOperatorSubtaskState state =
                 SnapshotUtils.snapshot(
+                        checkpointId,
                         this,
                         operator.getContainingTask()
                                 .getEnvironment()
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
index 2de1ecf56e2..e6f9a50f1e4 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
@@ -47,6 +47,8 @@ public class StateBootstrapWrapperOperatorFactory<
                 OPF extends OneInputStreamOperatorFactory<IN, OUT>>
         extends AbstractStreamOperatorFactory<TaggedOperatorSubtaskState> {
 
+    private final long checkpointId;
+
     private final long timestamp;
 
     private final Path savepointPath;
@@ -56,7 +58,11 @@ public class StateBootstrapWrapperOperatorFactory<
     private final WindowOperator<?, IN, ?, ?, ?> operator;
 
     public StateBootstrapWrapperOperatorFactory(
-            long timestamp, Path savepointPath, WindowOperator<?, IN, ?, ?, ?> 
operator) {
+            long checkpointId,
+            long timestamp,
+            Path savepointPath,
+            WindowOperator<?, IN, ?, ?, ?> operator) {
+        this.checkpointId = checkpointId;
         this.timestamp = timestamp;
         this.savepointPath = savepointPath;
         this.operator = operator;
@@ -67,7 +73,8 @@ public class StateBootstrapWrapperOperatorFactory<
     public <T extends StreamOperator<TaggedOperatorSubtaskState>> T 
createStreamOperator(
             StreamOperatorParameters<TaggedOperatorSubtaskState> parameters) {
         StateBootstrapWrapperOperator<IN, OUT, OP> wrapperOperator =
-                new StateBootstrapWrapperOperator<>(timestamp, savepointPath, 
operator);
+                new StateBootstrapWrapperOperator<>(
+                        checkpointId, timestamp, savepointPath, operator);
         wrapperOperator.setup(
                 parameters.getContainingTask(),
                 parameters.getStreamConfig(),
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
index ab771364434..90faf9541f1 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
@@ -41,6 +41,8 @@ import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND
 @Internal
 public class SavepointMetadataV2 {
 
+    private final long checkpointId;
+
     private final int maxParallelism;
 
     private final Collection<MasterState> masterStates;
@@ -48,6 +50,7 @@ public class SavepointMetadataV2 {
     private final Map<OperatorID, OperatorStateSpecV2> operatorStateIndex;
 
     public SavepointMetadataV2(
+            long checkpointId,
             int maxParallelism,
             Collection<MasterState> masterStates,
             Collection<OperatorState> initialStates) {
@@ -59,6 +62,7 @@ public class SavepointMetadataV2 {
                         + maxParallelism);
         Preconditions.checkNotNull(masterStates);
 
+        this.checkpointId = checkpointId;
         this.maxParallelism = maxParallelism;
         this.masterStates = new ArrayList<>(masterStates);
         this.operatorStateIndex = 
CollectionUtil.newHashMapWithExpectedSize(initialStates.size());
@@ -70,6 +74,10 @@ public class SavepointMetadataV2 {
                                 OperatorStateSpecV2.existing(existingState)));
     }
 
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
     public int getMaxParallelism() {
         return maxParallelism;
     }
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index 87414cf66df..d7a5586b602 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -38,6 +39,7 @@ import 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.runtime.SavepointLoader;
 import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,6 +68,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** IT test for writing savepoints. */
 public class SavepointWriterITCase extends AbstractTestBaseJUnit4 {
 
+    private static final long CHECKPOINT_ID = 42;
+
     private static final String ACCOUNT_UID = "accounts";
 
     private static final String CURRENCY_UID = "currency";
@@ -120,18 +124,18 @@ public class SavepointWriterITCase extends 
AbstractTestBaseJUnit4 {
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
 
         StateBootstrapTransformation<Account> transformation =
-                OperatorTransformation.bootstrapWith(env.fromData(accounts))
+                OperatorTransformation.bootstrapWith(env.fromData(accounts), 
CHECKPOINT_ID)
                         .keyBy(acc -> acc.id)
                         .transform(new AccountBootstrapper());
 
         StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
-                
OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
+                
OperatorTransformation.bootstrapWith(env.fromData(currencyRates), CHECKPOINT_ID)
                         .transform(new CurrencyBootstrapFunction());
 
         SavepointWriter writer =
                 backend == null
-                        ? SavepointWriter.newSavepoint(env, 128)
-                        : SavepointWriter.newSavepoint(env, backend, 128);
+                        ? SavepointWriter.newSavepoint(env, CHECKPOINT_ID, 128)
+                        : SavepointWriter.newSavepoint(env, backend, 
CHECKPOINT_ID, 128);
 
         writer.withOperator(OperatorIdentifier.forUid(ACCOUNT_UID), 
transformation)
                 .withOperator(getUidHashFromUid(CURRENCY_UID), 
broadcastTransformation)
@@ -142,6 +146,9 @@ public class SavepointWriterITCase extends 
AbstractTestBaseJUnit4 {
 
     private void validateBootstrap(Configuration configuration, String 
savepointPath)
             throws Exception {
+        CheckpointMetadata metadata = 
SavepointLoader.loadSavepointMetadata(savepointPath);
+        assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
+
         StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
@@ -192,6 +199,9 @@ public class SavepointWriterITCase extends 
AbstractTestBaseJUnit4 {
 
     private void validateModification(Configuration configuration, String 
savepointPath)
             throws Exception {
+        CheckpointMetadata metadata = 
SavepointLoader.loadSavepointMetadata(savepointPath);
+        assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
+
         StreamExecutionEnvironment sEnv =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
index b3aaa093c27..b5b034a0762 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
@@ -62,7 +62,7 @@ public class KeyedStateBootstrapOperatorTest {
 
         OperatorSubtaskState state;
         KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
-                new KeyedStateBootstrapOperator<>(0L, path, new 
TimerBootstrapFunction());
+                new KeyedStateBootstrapOperator<>(0L, 0L, path, new 
TimerBootstrapFunction());
         try (KeyedOneInputStreamOperatorTestHarness<Long, Long, 
TaggedOperatorSubtaskState>
                 harness = getHarness(bootstrapOperator)) {
             processElements(harness, 1L, 2L, 3L);
@@ -93,7 +93,7 @@ public class KeyedStateBootstrapOperatorTest {
 
         OperatorSubtaskState state;
         KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
-                new KeyedStateBootstrapOperator<>(0L, path, new 
SimpleBootstrapFunction());
+                new KeyedStateBootstrapOperator<>(0L, 0L, path, new 
SimpleBootstrapFunction());
         try (KeyedOneInputStreamOperatorTestHarness<Long, Long, 
TaggedOperatorSubtaskState>
                 harness = getHarness(bootstrapOperator)) {
             processElements(harness, 1L, 2L, 3L);
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 0582fcc9d9a..3cedf154c0d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -77,7 +77,7 @@ public class SnapshotUtilsTest {
         Path path = new Path(folder.newFolder().getAbsolutePath());
 
         SnapshotUtils.snapshot(
-                operator, 0, 0L, true, false, new Configuration(), path, 
savepointFormatType);
+                0L, operator, 0, 0L, true, false, new Configuration(), path, 
savepointFormatType);
 
         Assert.assertEquals(SavepointType.savepoint(savepointFormatType), 
actualSnapshotType);
         Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, 
ACTUAL_ORDER_TRACKING);


Reply via email to