This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b51da2707 [flink] auto create new tag for flink savepoint. (#1741)
b51da2707 is described below

commit b51da2707cb188ba5a5b78de98bd777a89f737ef
Author: liming.1018 <[email protected]>
AuthorDate: Tue Aug 22 11:59:57 2023 +0800

    [flink] auto create new tag for flink savepoint. (#1741)
---
 docs/content/maintenance/manage-tags.md            |  34 ++-
 .../generated/flink_connector_configuration.html   |   6 +
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../java/org/apache/paimon/table/DataTable.java    |   3 +
 .../apache/paimon/table/system/AuditLogTable.java  |   6 +
 .../apache/paimon/table/system/BucketsTable.java   |   6 +
 .../org/apache/paimon/utils/SnapshotManager.java   |  39 ++++
 .../apache/paimon/flink/FlinkConnectorOptions.java |   7 +
 .../sink/AutoTagForSavepointCommitterOperator.java | 247 +++++++++++++++++++++
 .../paimon/flink/sink/CommitterOperator.java       |   8 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  20 +-
 .../AutoTagForSavepointCommitterOperatorTest.java  | 180 +++++++++++++++
 .../paimon/flink/sink/CommitterOperatorTest.java   |  97 ++++----
 13 files changed, 606 insertions(+), 50 deletions(-)

diff --git a/docs/content/maintenance/manage-tags.md 
b/docs/content/maintenance/manage-tags.md
index de7eb39d5..c7e6a0913 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -219,4 +219,36 @@ public class RollbackTo {
 
 {{< /tab >}}
 
-{{< /tabs >}}
\ No newline at end of file
+{{< /tabs >}}
+
+## Work with Flink Savepoint
+
+In Flink, we may consume from kafka and then write to paimon. Since flink's 
checkpoint only retains a limited number, 
+we will trigger a savepoint at certain time (such as code upgrades, data 
updates, etc.) to ensure that the state can 
+be retained for a longer time, so that the job can be restored incrementally. 
+
+Paimon's snapshot is similar to flink's checkpoint, and both will 
automatically expire, but the tag feature of paimon 
+allows snapshots to be retained for a long time. Therefore, we can combine the 
two features of paimon's tag and flink's 
+savepoint to achieve incremental recovery of job from the specified savepoint.
+
+**Step 1: Enable automatically create tags for savepoint.**
+
+You can set `sink.savepoint.auto-tag` to `true` to enable the feature of 
automatically creating tags for savepoint.
+
+**Step 2: Trigger savepoint.**
+
+You can refer to [flink 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/savepoints/#operations)
 
+to learn how to configure and trigger savepoint.
+
+**Step 3: Choose the tag corresponding to the savepoint.**
+
+The tag corresponding to the savepoint will be named in the form of 
`savepoint-${savepointID}`. You can refer to 
+[Tags Table]({{< ref "how-to/system-tables#tags-table" >}}) to query.
+
+**Step 4: Rollback the paimon table.**
+
+[Rollback]({{< ref "maintenance/manage-tags#rollback-to-tag" >}}) the paimon 
table to the specified tag.
+
+**Step 5: Restart from the savepoint.**
+
+You can refer to 
[here](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/savepoints/#resuming-from-savepoints)
 to learn how to restart from a specified savepoint.
\ No newline at end of file
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index db571cf2f..ffcb5b9c9 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -140,6 +140,12 @@ under the License.
             <td>Boolean</td>
             <td>If true, flink sink will use managed memory for merge tree; 
otherwise, it will create an independent memory allocator.</td>
         </tr>
+        <tr>
+            <td><h5>sink.savepoint.auto-tag</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, a tag will be automatically created for the snapshot 
created by flink savepoint.</td>
+        </tr>
         <tr>
             <td><h5>source.checkpoint-align.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a6ef0a07a..ea939bf49 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -448,7 +448,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
     }
 
-    private TagManager tagManager() {
+    @Override
+    public TagManager tagManager() {
         return new TagManager(fileIO, path);
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index 0a466cf80..1a66edc5e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 /** A {@link Table} for data. */
 public interface DataTable extends InnerTable {
@@ -33,6 +34,8 @@ public interface DataTable extends InnerTable {
 
     SnapshotManager snapshotManager();
 
+    TagManager tagManager();
+
     Path location();
 
     FileIO fileIO();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 2063859a3..ba8a4b4cc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -51,6 +51,7 @@ import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -152,6 +153,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return dataTable.snapshotManager();
     }
 
+    @Override
+    public TagManager tagManager() {
+        return dataTable.tagManager();
+    }
+
     @Override
     public InnerTableRead newRead() {
         return new AuditLogRead(dataTable.newRead());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 971b97d3c..31f2665af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IteratorRecordReader;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -81,6 +82,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public TagManager tagManager() {
+        return wrapped.tagManager();
+    }
+
     @Override
     public String name() {
         return "__internal_buckets_" + wrapped.location().getName();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index b44d61d12..0c8832ca3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -22,13 +22,19 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
@@ -214,6 +220,39 @@ public class SnapshotManager implements Serializable {
         return Optional.empty();
     }
 
+    /** Find the snapshot of the specified identifiers written by the 
specified user. */
+    public List<Snapshot> findSnapshotsForIdentifiers(
+            @Nonnull String user, List<Long> identifiers) {
+        if (identifiers.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Long latestId = latestSnapshotId();
+        if (latestId == null) {
+            return Collections.emptyList();
+        }
+        long earliestId =
+                Preconditions.checkNotNull(
+                        earliestSnapshotId(),
+                        "Latest snapshot id is not null, but earliest snapshot 
id is null. "
+                                + "This is unexpected.");
+
+        long minSearchedIdentifier = 
identifiers.stream().min(Long::compareTo).get();
+        List<Snapshot> matchedSnapshots = new ArrayList<>();
+        Set<Long> remainingIdentifiers = new HashSet<>(identifiers);
+        for (long id = latestId; id >= earliestId && 
!remainingIdentifiers.isEmpty(); id--) {
+            Snapshot snapshot = snapshot(id);
+            if (user.equals(snapshot.commitUser())) {
+                if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
+                    matchedSnapshots.add(snapshot);
+                }
+                if (snapshot.commitIdentifier() <= minSearchedIdentifier) {
+                    break;
+                }
+            }
+        }
+        return matchedSnapshots;
+    }
+
     /**
      * Traversal snapshots from latest to earliest safely, this is applied on 
the writer side
      * because the committer may delete obsolete snapshots, which may cause 
the writer to encounter
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index de8485423..66f8bdd71 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -250,6 +250,13 @@ public class FlinkConnectorOptions {
                     .defaultValue(16)
                     .withDescription("The thread number for lookup async.");
 
+    public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
+            ConfigOptions.key("sink.savepoint.auto-tag")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, a tag will be automatically created for 
the snapshot created by flink savepoint.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
new file mode 100644
index 000000000..318c36aba
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Commit {@link Committable} for each snapshot using the {@link 
CommitterOperator}. At the same
+ * time, tags are automatically created for each flink savepoint.
+ */
+public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
+        implements OneInputStreamOperator<CommitT, CommitT>,
+                SetupableStreamOperator,
+                BoundedOneInput {
+    public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";
+
+    private static final long serialVersionUID = 1L;
+
+    private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
+
+    private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;
+
+    private final SerializableSupplier<TagManager> tagManagerFactory;
+
+    private final Set<Long> identifiersForTags;
+
+    protected SnapshotManager snapshotManager;
+
+    protected TagManager tagManager;
+
+    private transient ListState<Long> identifiersForTagsState;
+
+    public AutoTagForSavepointCommitterOperator(
+            CommitterOperator<CommitT, GlobalCommitT> commitOperator,
+            SerializableSupplier<SnapshotManager> snapshotManagerFactory,
+            SerializableSupplier<TagManager> tagManagerFactory) {
+        this.commitOperator = commitOperator;
+        this.tagManagerFactory = tagManagerFactory;
+        this.snapshotManagerFactory = snapshotManagerFactory;
+        this.identifiersForTags = new HashSet<>();
+    }
+
+    @Override
+    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+            throws Exception {
+        try {
+            commitOperator.initializeState(streamTaskStateManager);
+        } finally {
+            snapshotManager = snapshotManagerFactory.get();
+            tagManager = tagManagerFactory.get();
+
+            identifiersForTagsState =
+                    commitOperator
+                            .getOperatorStateBackend()
+                            .getListState(
+                                    new ListStateDescriptor<>(
+                                            
"streaming_committer_for_tags_states",
+                                            LongSerializer.INSTANCE));
+            List<Long> restored = new ArrayList<>();
+            identifiersForTagsState.get().forEach(restored::add);
+            identifiersForTagsState.clear();
+            createTagForIdentifiers(restored);
+        }
+    }
+
+    @Override
+    public OperatorSnapshotFutures snapshotState(
+            long checkpointId,
+            long timestamp,
+            CheckpointOptions checkpointOptions,
+            CheckpointStreamFactory storageLocation)
+            throws Exception {
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            identifiersForTags.add(checkpointId);
+        }
+        identifiersForTagsState.update(new ArrayList<>(identifiersForTags));
+        return commitOperator.snapshotState(
+                checkpointId, timestamp, checkpointOptions, storageLocation);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        commitOperator.notifyCheckpointComplete(checkpointId);
+        if (identifiersForTags.remove(checkpointId)) {
+            createTagForIdentifiers(Collections.singletonList(checkpointId));
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        commitOperator.notifyCheckpointAborted(checkpointId);
+        identifiersForTags.remove(checkpointId);
+    }
+
+    private void createTagForIdentifiers(List<Long> identifiers) {
+        List<Snapshot> snapshotForTags =
+                snapshotManager.findSnapshotsForIdentifiers(
+                        commitOperator.getCommitUser(), identifiers);
+        for (Snapshot snapshot : snapshotForTags) {
+            String tagName = SAVEPOINT_TAG_PREFIX + 
snapshot.commitIdentifier();
+            if (!tagManager.tagExists(tagName)) {
+                tagManager.createTag(snapshot, tagName);
+            }
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        commitOperator.open();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommitT> element) throws Exception 
{
+        commitOperator.processElement(element);
+    }
+
+    @Override
+    public void processWatermark(Watermark watermark) throws Exception {
+        commitOperator.processWatermark(watermark);
+    }
+
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
+        commitOperator.processWatermarkStatus(watermarkStatus);
+    }
+
+    @Override
+    public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+        commitOperator.processLatencyMarker(latencyMarker);
+    }
+
+    @Override
+    public void finish() throws Exception {
+        commitOperator.finish();
+    }
+
+    @Override
+    public void close() throws Exception {
+        commitOperator.close();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        commitOperator.prepareSnapshotPreBarrier(checkpointId);
+    }
+
+    @Override
+    public void setKeyContextElement1(StreamRecord<?> record) throws Exception 
{
+        commitOperator.setKeyContextElement1(record);
+    }
+
+    @Override
+    public void setKeyContextElement2(StreamRecord<?> record) throws Exception 
{
+        commitOperator.setKeyContextElement2(record);
+    }
+
+    @Override
+    public OperatorMetricGroup getMetricGroup() {
+        return commitOperator.getMetricGroup();
+    }
+
+    @Override
+    public OperatorID getOperatorID() {
+        return commitOperator.getOperatorID();
+    }
+
+    @Override
+    public void setCurrentKey(Object key) {
+        commitOperator.setCurrentKey(key);
+    }
+
+    @Override
+    public Object getCurrentKey() {
+        return commitOperator.getCurrentKey();
+    }
+
+    @Override
+    public void setKeyContextElement(StreamRecord<CommitT> record) throws 
Exception {
+        commitOperator.setKeyContextElement(record);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        commitOperator.endInput();
+    }
+
+    @Override
+    public void setup(StreamTask containingTask, StreamConfig config, Output 
output) {
+        commitOperator.setup(containingTask, config, output);
+    }
+
+    @Override
+    public ChainingStrategy getChainingStrategy() {
+        return commitOperator.getChainingStrategy();
+    }
+
+    @Override
+    public void setChainingStrategy(ChainingStrategy strategy) {
+        commitOperator.setChainingStrategy(strategy);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index c0e9f62e3..d7531bc99 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -79,6 +79,8 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
 
     private transient boolean endInput;
 
+    private transient String commitUser;
+
     public CommitterOperator(
             boolean streamingCheckpointEnabled,
             String initialCommitUser,
@@ -101,7 +103,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
         // each job can only have one user name and this name must be 
consistent across restarts
         // we cannot use job id as commit user name here because user may 
change job id by creating
         // a savepoint, stop the job and then resume from savepoint
-        String commitUser =
+        commitUser =
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
         // parallelism of commit operator is always 1, so commitUser will 
never be null
@@ -174,6 +176,10 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
         super.close();
     }
 
+    public String getCommitUser() {
+        return commitUser;
+    }
+
     private void pollInputs() throws Exception {
         Map<Long, List<CommitT>> grouped = committer.groupByCheckpoint(inputs);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 5c60b0304..e72e35adb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -46,6 +46,7 @@ import java.util.UUID;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -184,15 +185,24 @@ public abstract class FlinkSink<T> implements 
Serializable {
             assertStreamingConfiguration(env);
         }
 
+        OneInputStreamOperator<Committable, Committable> committerOperator =
+                new CommitterOperator<>(
+                        streamingCheckpointEnabled,
+                        commitUser,
+                        createCommitterFactory(streamingCheckpointEnabled),
+                        createCommittableStateManager());
+        if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) 
{
+            committerOperator =
+                    new AutoTagForSavepointCommitterOperator<>(
+                            (CommitterOperator<Committable, 
ManifestCommittable>) committerOperator,
+                            table::snapshotManager,
+                            table::tagManager);
+        }
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME + " : " + table.name(),
                                 new CommittableTypeInfo(),
-                                new CommitterOperator<>(
-                                        streamingCheckpointEnabled,
-                                        commitUser,
-                                        
createCommitterFactory(streamingCheckpointEnabled),
-                                        createCommittableStateManager()))
+                                committerOperator)
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
new file mode 100644
index 000000000..7b44a1930
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.utils.ThrowingConsumer;
+
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.SavepointType;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for {@link AutoTagForSavepointCommitterOperator}. */
+public class AutoTagForSavepointCommitterOperatorTest extends 
CommitterOperatorTest {
+
+    @Test
+    public void testAutoTagForSavepoint() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table);
+        testHarness.open();
+        StreamTableWrite write =
+                
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+        long checkpointId = 1L, timestamp = 1L;
+        processCommittable(testHarness, write, checkpointId, timestamp, 
GenericRow.of(1, 10L));
+
+        // checkpoint is completed but not notified, so no snapshot is 
committed
+        testHarness.snapshotWithLocalState(checkpointId, timestamp, 
CheckpointType.CHECKPOINT);
+        assertThat(table.snapshotManager().latestSnapshotId()).isNull();
+
+        // notify checkpoint success and no tags will be created
+        testHarness.notifyOfCompletedCheckpoint(checkpointId);
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+        assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+        processCommittable(testHarness, write, ++checkpointId, ++timestamp, 
GenericRow.of(2, 20L));
+
+        // trigger savepoint but not notified
+        testHarness.snapshotWithLocalState(
+                checkpointId, timestamp, 
SavepointType.savepoint(SavepointFormatType.CANONICAL));
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+        assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+        // notify savepoint success
+        testHarness.notifyOfCompletedCheckpoint(checkpointId);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.id()).isEqualTo(checkpointId);
+
+        Map<Snapshot, String> tags = table.tagManager().tags();
+        assertThat(tags).containsOnlyKeys(snapshot);
+        assertThat(tags.get(snapshot))
+                .isEqualTo(
+                        
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table);
+        testHarness.open();
+        StreamTableWrite write =
+                
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+        long checkpointId = 1L, timestamp = 1L;
+        processCommittable(testHarness, write, checkpointId, timestamp, 
GenericRow.of(1, 10L));
+
+        // trigger savepoint but not notified
+        OperatorSubtaskState subtaskState =
+                testHarness
+                        .snapshotWithLocalState(
+                                checkpointId,
+                                timestamp,
+                                
SavepointType.savepoint(SavepointFormatType.CANONICAL))
+                        .getJobManagerOwnedState();
+        assertThat(table.snapshotManager().latestSnapshot()).isNull();
+        assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+        testHarness = createRecoverableTestHarness(table);
+        try {
+            // commit snapshot from state, fail intentionally
+            testHarness.initializeState(subtaskState);
+            testHarness.open();
+            fail("Expecting intentional exception");
+        } catch (Exception e) {
+            assertThat(e)
+                    .hasMessageContaining(
+                            "This exception is intentionally thrown "
+                                    + "after committing the restored 
checkpoints. "
+                                    + "By restarting the job we hope that "
+                                    + "writers can start writing based on 
these new commits.");
+        }
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.id()).isEqualTo(checkpointId);
+
+        Map<Snapshot, String> tags = table.tagManager().tags();
+        assertThat(tags).containsOnlyKeys(snapshot);
+        assertThat(tags.get(snapshot))
+                .isEqualTo(
+                        
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
+    }
+
+    private void processCommittable(
+            OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness,
+            StreamTableWrite write,
+            long checkpointId,
+            long timestamp,
+            InternalRow... rows)
+            throws Exception {
+        for (InternalRow row : rows) {
+            write.write(row);
+        }
+        for (CommitMessage committable : write.prepareCommit(false, 
checkpointId)) {
+            testHarness.processElement(
+                    new Committable(checkpointId, Committable.Kind.FILE, 
committable), timestamp);
+        }
+    }
+
+    @Override
+    protected OneInputStreamOperator<Committable, Committable> 
createCommitterOperator(
+            FileStoreTable table,
+            String commitUser,
+            CommittableStateManager<ManifestCommittable> 
committableStateManager) {
+        return new AutoTagForSavepointCommitterOperator<>(
+                (CommitterOperator<Committable, ManifestCommittable>)
+                        super.createCommitterOperator(table, commitUser, 
committableStateManager),
+                table::snapshotManager,
+                table::tagManager);
+    }
+
+    @Override
+    protected OneInputStreamOperator<Committable, Committable> 
createCommitterOperator(
+            FileStoreTable table,
+            String commitUser,
+            CommittableStateManager<ManifestCommittable> 
committableStateManager,
+            ThrowingConsumer<StateInitializationContext, Exception> 
initializeFunction) {
+        return new AutoTagForSavepointCommitterOperator<>(
+                (CommitterOperator<Committable, ManifestCommittable>)
+                        super.createCommitterOperator(
+                                table, commitUser, committableStateManager, 
initializeFunction),
+                table::snapshotManager,
+                table::tagManager);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 74d8561bd..ea089b34a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.ThrowingConsumer;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
@@ -39,6 +40,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -57,7 +59,7 @@ import static org.assertj.core.api.Assertions.fail;
 /** Tests for {@link CommitterOperator}. */
 public class CommitterOperatorTest extends CommitterOperatorTestBase {
 
-    private String initialCommitUser;
+    protected String initialCommitUser;
 
     @BeforeEach
     public void before() {
@@ -241,27 +243,19 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         // 3. Check whether success
         List<String> actual = new ArrayList<>();
 
-        CommitterOperator<Committable, ManifestCommittable> operator =
-                new CommitterOperator<Committable, ManifestCommittable>(
-                        true,
+        OneInputStreamOperator<Committable, Committable> operator =
+                createCommitterOperator(
+                        table,
                         initialCommitUser,
-                        user ->
-                                new StoreCommitter(
-                                        table.newStreamWriteBuilder()
-                                                .withCommitUser(user)
-                                                .newCommit()),
-                        new NoopCommittableStateManager()) {
-                    @Override
-                    public void initializeState(StateInitializationContext 
context)
-                            throws Exception {
-                        ListState<String> state =
-                                context.getOperatorStateStore()
-                                        .getUnionListState(
-                                                new ListStateDescriptor<>(
-                                                        "commit_user_state", 
String.class));
-                        state.get().forEach(actual::add);
-                    }
-                };
+                        new NoopCommittableStateManager(),
+                        context -> {
+                            ListState<String> state =
+                                    context.getOperatorStateStore()
+                                            .getUnionListState(
+                                                    new ListStateDescriptor<>(
+                                                            
"commit_user_state", String.class));
+                            state.get().forEach(actual::add);
+                        });
 
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness1 =
                 createTestHarness(operator);
@@ -354,17 +348,12 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
     //  Test utils
     // ------------------------------------------------------------------------
 
-    private OneInputStreamOperatorTestHarness<Committable, Committable>
+    protected OneInputStreamOperatorTestHarness<Committable, Committable>
             createRecoverableTestHarness(FileStoreTable table) throws 
Exception {
-        CommitterOperator<Committable, ManifestCommittable> operator =
-                new CommitterOperator<>(
-                        true,
-                        initialCommitUser,
-                        user ->
-                                new StoreCommitter(
-                                        table.newStreamWriteBuilder()
-                                                .withCommitUser(user)
-                                                .newCommit()),
+        OneInputStreamOperator<Committable, Committable> operator =
+                createCommitterOperator(
+                        table,
+                        null,
                         new RestoreAndFailCommittableStateManager<>(
                                 () ->
                                         new VersionedSerializerWrapper<>(
@@ -379,21 +368,13 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
     private OneInputStreamOperatorTestHarness<Committable, Committable> 
createLossyTestHarness(
             FileStoreTable table, String commitUser) throws Exception {
-        CommitterOperator<Committable, ManifestCommittable> operator =
-                new CommitterOperator<>(
-                        true,
-                        commitUser == null ? initialCommitUser : commitUser,
-                        user ->
-                                new StoreCommitter(
-                                        table.newStreamWriteBuilder()
-                                                .withCommitUser(user)
-                                                .newCommit()),
-                        new NoopCommittableStateManager());
+        OneInputStreamOperator<Committable, Committable> operator =
+                createCommitterOperator(table, commitUser, new 
NoopCommittableStateManager());
         return createTestHarness(operator);
     }
 
     private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
-            CommitterOperator<Committable, ManifestCommittable> operator) 
throws Exception {
+            OneInputStreamOperator<Committable, Committable> operator) throws 
Exception {
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
         OneInputStreamOperatorTestHarness<Committable, Committable> harness =
@@ -401,4 +382,36 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         harness.setup(serializer);
         return harness;
     }
+
+    protected OneInputStreamOperator<Committable, Committable> 
createCommitterOperator(
+            FileStoreTable table,
+            String commitUser,
+            CommittableStateManager<ManifestCommittable> 
committableStateManager) {
+        return new CommitterOperator<>(
+                true,
+                commitUser == null ? initialCommitUser : commitUser,
+                user ->
+                        new StoreCommitter(
+                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+                committableStateManager);
+    }
+
+    protected OneInputStreamOperator<Committable, Committable> 
createCommitterOperator(
+            FileStoreTable table,
+            String commitUser,
+            CommittableStateManager<ManifestCommittable> 
committableStateManager,
+            ThrowingConsumer<StateInitializationContext, Exception> 
initializeFunction) {
+        return new CommitterOperator<Committable, ManifestCommittable>(
+                true,
+                commitUser == null ? initialCommitUser : commitUser,
+                user ->
+                        new StoreCommitter(
+                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+                committableStateManager) {
+            @Override
+            public void initializeState(StateInitializationContext context) 
throws Exception {
+                initializeFunction.accept(context);
+            }
+        };
+    }
 }


Reply via email to