This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b51da2707 [flink] auto create new tag for flink savepoint. (#1741)
b51da2707 is described below
commit b51da2707cb188ba5a5b78de98bd777a89f737ef
Author: liming.1018 <[email protected]>
AuthorDate: Tue Aug 22 11:59:57 2023 +0800
[flink] auto create new tag for flink savepoint. (#1741)
---
docs/content/maintenance/manage-tags.md | 34 ++-
.../generated/flink_connector_configuration.html | 6 +
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../java/org/apache/paimon/table/DataTable.java | 3 +
.../apache/paimon/table/system/AuditLogTable.java | 6 +
.../apache/paimon/table/system/BucketsTable.java | 6 +
.../org/apache/paimon/utils/SnapshotManager.java | 39 ++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +
.../sink/AutoTagForSavepointCommitterOperator.java | 247 +++++++++++++++++++++
.../paimon/flink/sink/CommitterOperator.java | 8 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 20 +-
.../AutoTagForSavepointCommitterOperatorTest.java | 180 +++++++++++++++
.../paimon/flink/sink/CommitterOperatorTest.java | 97 ++++----
13 files changed, 606 insertions(+), 50 deletions(-)
diff --git a/docs/content/maintenance/manage-tags.md
b/docs/content/maintenance/manage-tags.md
index de7eb39d5..c7e6a0913 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -219,4 +219,36 @@ public class RollbackTo {
{{< /tab >}}
-{{< /tabs >}}
\ No newline at end of file
+{{< /tabs >}}
+
+## Work with Flink Savepoint
+
+In Flink, we may consume from kafka and then write to paimon. Since flink's
checkpoint only retains a limited number,
+we will trigger a savepoint at certain time (such as code upgrades, data
updates, etc.) to ensure that the state can
+be retained for a longer time, so that the job can be restored incrementally.
+
+Paimon's snapshot is similar to flink's checkpoint, and both will
automatically expire, but the tag feature of paimon
+allows snapshots to be retained for a long time. Therefore, we can combine the
two features of paimon's tag and flink's
+savepoint to achieve incremental recovery of job from the specified savepoint.
+
+**Step 1: Enable automatically create tags for savepoint.**
+
+You can set `sink.savepoint.auto-tag` to `true` to enable the feature of
automatically creating tags for savepoint.
+
+**Step 2: Trigger savepoint.**
+
+You can refer to [flink
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/savepoints/#operations)
+to learn how to configure and trigger savepoint.
+
+**Step 3: Choose the tag corresponding to the savepoint.**
+
+The tag corresponding to the savepoint will be named in the form of
`savepoint-${savepointID}`. You can refer to
+[Tags Table]({{< ref "how-to/system-tables#tags-table" >}}) to query.
+
+**Step 4: Rollback the paimon table.**
+
+[Rollback]({{< ref "maintenance/manage-tags#rollback-to-tag" >}}) the paimon
table to the specified tag.
+
+**Step 5: Restart from the savepoint.**
+
+You can refer to
[here](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/savepoints/#resuming-from-savepoints)
to learn how to restart from a specified savepoint.
\ No newline at end of file
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index db571cf2f..ffcb5b9c9 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -140,6 +140,12 @@ under the License.
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree;
otherwise, it will create an independent memory allocator.</td>
</tr>
+ <tr>
+ <td><h5>sink.savepoint.auto-tag</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, a tag will be automatically created for the snapshot
created by flink savepoint.</td>
+ </tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a6ef0a07a..ea939bf49 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -448,7 +448,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
}
- private TagManager tagManager() {
+ @Override
+ public TagManager tagManager() {
return new TagManager(fileIO, path);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index 0a466cf80..1a66edc5e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
/** A {@link Table} for data. */
public interface DataTable extends InnerTable {
@@ -33,6 +34,8 @@ public interface DataTable extends InnerTable {
SnapshotManager snapshotManager();
+ TagManager tagManager();
+
Path location();
FileIO fileIO();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 2063859a3..ba8a4b4cc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -51,6 +51,7 @@ import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -152,6 +153,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return dataTable.snapshotManager();
}
+ @Override
+ public TagManager tagManager() {
+ return dataTable.tagManager();
+ }
+
@Override
public InnerTableRead newRead() {
return new AuditLogRead(dataTable.newRead());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 971b97d3c..31f2665af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import java.io.IOException;
import java.util.ArrayList;
@@ -81,6 +82,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
return wrapped.snapshotManager();
}
+ @Override
+ public TagManager tagManager() {
+ return wrapped.tagManager();
+ }
+
@Override
public String name() {
return "__internal_buckets_" + wrapped.location().getName();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index b44d61d12..0c8832ca3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -22,13 +22,19 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;
@@ -214,6 +220,39 @@ public class SnapshotManager implements Serializable {
return Optional.empty();
}
+ /** Find the snapshot of the specified identifiers written by the
specified user. */
+ public List<Snapshot> findSnapshotsForIdentifiers(
+ @Nonnull String user, List<Long> identifiers) {
+ if (identifiers.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Long latestId = latestSnapshotId();
+ if (latestId == null) {
+ return Collections.emptyList();
+ }
+ long earliestId =
+ Preconditions.checkNotNull(
+ earliestSnapshotId(),
+ "Latest snapshot id is not null, but earliest snapshot
id is null. "
+ + "This is unexpected.");
+
+ long minSearchedIdentifier =
identifiers.stream().min(Long::compareTo).get();
+ List<Snapshot> matchedSnapshots = new ArrayList<>();
+ Set<Long> remainingIdentifiers = new HashSet<>(identifiers);
+ for (long id = latestId; id >= earliestId &&
!remainingIdentifiers.isEmpty(); id--) {
+ Snapshot snapshot = snapshot(id);
+ if (user.equals(snapshot.commitUser())) {
+ if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
+ matchedSnapshots.add(snapshot);
+ }
+ if (snapshot.commitIdentifier() <= minSearchedIdentifier) {
+ break;
+ }
+ }
+ }
+ return matchedSnapshots;
+ }
+
/**
* Traversal snapshots from latest to earliest safely, this is applied on
the writer side
* because the committer may delete obsolete snapshots, which may cause
the writer to encounter
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index de8485423..66f8bdd71 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -250,6 +250,13 @@ public class FlinkConnectorOptions {
.defaultValue(16)
.withDescription("The thread number for lookup async.");
+ public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
+ ConfigOptions.key("sink.savepoint.auto-tag")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, a tag will be automatically created for
the snapshot created by flink savepoint.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
new file mode 100644
index 000000000..318c36aba
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SerializableSupplier;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Commit {@link Committable} for each snapshot using the {@link
CommitterOperator}. At the same
+ * time, tags are automatically created for each flink savepoint.
+ */
+public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
+ implements OneInputStreamOperator<CommitT, CommitT>,
+ SetupableStreamOperator,
+ BoundedOneInput {
+ public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";
+
+ private static final long serialVersionUID = 1L;
+
+ private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
+
+ private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;
+
+ private final SerializableSupplier<TagManager> tagManagerFactory;
+
+ private final Set<Long> identifiersForTags;
+
+ protected SnapshotManager snapshotManager;
+
+ protected TagManager tagManager;
+
+ private transient ListState<Long> identifiersForTagsState;
+
+ public AutoTagForSavepointCommitterOperator(
+ CommitterOperator<CommitT, GlobalCommitT> commitOperator,
+ SerializableSupplier<SnapshotManager> snapshotManagerFactory,
+ SerializableSupplier<TagManager> tagManagerFactory) {
+ this.commitOperator = commitOperator;
+ this.tagManagerFactory = tagManagerFactory;
+ this.snapshotManagerFactory = snapshotManagerFactory;
+ this.identifiersForTags = new HashSet<>();
+ }
+
+ @Override
+ public void initializeState(StreamTaskStateInitializer
streamTaskStateManager)
+ throws Exception {
+ try {
+ commitOperator.initializeState(streamTaskStateManager);
+ } finally {
+ snapshotManager = snapshotManagerFactory.get();
+ tagManager = tagManagerFactory.get();
+
+ identifiersForTagsState =
+ commitOperator
+ .getOperatorStateBackend()
+ .getListState(
+ new ListStateDescriptor<>(
+
"streaming_committer_for_tags_states",
+ LongSerializer.INSTANCE));
+ List<Long> restored = new ArrayList<>();
+ identifiersForTagsState.get().forEach(restored::add);
+ identifiersForTagsState.clear();
+ createTagForIdentifiers(restored);
+ }
+ }
+
+ @Override
+ public OperatorSnapshotFutures snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory storageLocation)
+ throws Exception {
+ if (checkpointOptions.getCheckpointType().isSavepoint()) {
+ identifiersForTags.add(checkpointId);
+ }
+ identifiersForTagsState.update(new ArrayList<>(identifiersForTags));
+ return commitOperator.snapshotState(
+ checkpointId, timestamp, checkpointOptions, storageLocation);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointComplete(checkpointId);
+ if (identifiersForTags.remove(checkpointId)) {
+ createTagForIdentifiers(Collections.singletonList(checkpointId));
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointAborted(checkpointId);
+ identifiersForTags.remove(checkpointId);
+ }
+
+ private void createTagForIdentifiers(List<Long> identifiers) {
+ List<Snapshot> snapshotForTags =
+ snapshotManager.findSnapshotsForIdentifiers(
+ commitOperator.getCommitUser(), identifiers);
+ for (Snapshot snapshot : snapshotForTags) {
+ String tagName = SAVEPOINT_TAG_PREFIX +
snapshot.commitIdentifier();
+ if (!tagManager.tagExists(tagName)) {
+ tagManager.createTag(snapshot, tagName);
+ }
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ commitOperator.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord<CommitT> element) throws Exception
{
+ commitOperator.processElement(element);
+ }
+
+ @Override
+ public void processWatermark(Watermark watermark) throws Exception {
+ commitOperator.processWatermark(watermark);
+ }
+
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws
Exception {
+ commitOperator.processWatermarkStatus(watermarkStatus);
+ }
+
+ @Override
+ public void processLatencyMarker(LatencyMarker latencyMarker) throws
Exception {
+ commitOperator.processLatencyMarker(latencyMarker);
+ }
+
+ @Override
+ public void finish() throws Exception {
+ commitOperator.finish();
+ }
+
+ @Override
+ public void close() throws Exception {
+ commitOperator.close();
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ commitOperator.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord<?> record) throws Exception
{
+ commitOperator.setKeyContextElement1(record);
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord<?> record) throws Exception
{
+ commitOperator.setKeyContextElement2(record);
+ }
+
+ @Override
+ public OperatorMetricGroup getMetricGroup() {
+ return commitOperator.getMetricGroup();
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return commitOperator.getOperatorID();
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {
+ commitOperator.setCurrentKey(key);
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return commitOperator.getCurrentKey();
+ }
+
+ @Override
+ public void setKeyContextElement(StreamRecord<CommitT> record) throws
Exception {
+ commitOperator.setKeyContextElement(record);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ commitOperator.endInput();
+ }
+
+ @Override
+ public void setup(StreamTask containingTask, StreamConfig config, Output
output) {
+ commitOperator.setup(containingTask, config, output);
+ }
+
+ @Override
+ public ChainingStrategy getChainingStrategy() {
+ return commitOperator.getChainingStrategy();
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ commitOperator.setChainingStrategy(strategy);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index c0e9f62e3..d7531bc99 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -79,6 +79,8 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
private transient boolean endInput;
+ private transient String commitUser;
+
public CommitterOperator(
boolean streamingCheckpointEnabled,
String initialCommitUser,
@@ -101,7 +103,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
// each job can only have one user name and this name must be
consistent across restarts
// we cannot use job id as commit user name here because user may
change job id by creating
// a savepoint, stop the job and then resume from savepoint
- String commitUser =
+ commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will
never be null
@@ -174,6 +176,10 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
super.close();
}
+ public String getCommitUser() {
+ return commitUser;
+ }
+
private void pollInputs() throws Exception {
Map<Long, List<CommitT>> grouped = committer.groupByCheckpoint(inputs);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 5c60b0304..e72e35adb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -46,6 +46,7 @@ import java.util.UUID;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -184,15 +185,24 @@ public abstract class FlinkSink<T> implements
Serializable {
assertStreamingConfiguration(env);
}
+ OneInputStreamOperator<Committable, Committable> committerOperator =
+ new CommitterOperator<>(
+ streamingCheckpointEnabled,
+ commitUser,
+ createCommitterFactory(streamingCheckpointEnabled),
+ createCommittableStateManager());
+ if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT))
{
+ committerOperator =
+ new AutoTagForSavepointCommitterOperator<>(
+ (CommitterOperator<Committable,
ManifestCommittable>) committerOperator,
+ table::snapshotManager,
+ table::tagManager);
+ }
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
- new CommitterOperator<>(
- streamingCheckpointEnabled,
- commitUser,
-
createCommitterFactory(streamingCheckpointEnabled),
- createCommittableStateManager()))
+ committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
new file mode 100644
index 000000000..7b44a1930
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.utils.ThrowingConsumer;
+
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.SavepointType;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for {@link AutoTagForSavepointCommitterOperator}. */
+public class AutoTagForSavepointCommitterOperatorTest extends
CommitterOperatorTest {
+
+ @Test
+ public void testAutoTagForSavepoint() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table);
+ testHarness.open();
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ long checkpointId = 1L, timestamp = 1L;
+ processCommittable(testHarness, write, checkpointId, timestamp,
GenericRow.of(1, 10L));
+
+ // checkpoint is completed but not notified, so no snapshot is
committed
+ testHarness.snapshotWithLocalState(checkpointId, timestamp,
CheckpointType.CHECKPOINT);
+ assertThat(table.snapshotManager().latestSnapshotId()).isNull();
+
+ // notify checkpoint success and no tags will be created
+ testHarness.notifyOfCompletedCheckpoint(checkpointId);
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+ processCommittable(testHarness, write, ++checkpointId, ++timestamp,
GenericRow.of(2, 20L));
+
+ // trigger savepoint but not notified
+ testHarness.snapshotWithLocalState(
+ checkpointId, timestamp,
SavepointType.savepoint(SavepointFormatType.CANONICAL));
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+ // notify savepoint success
+ testHarness.notifyOfCompletedCheckpoint(checkpointId);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.id()).isEqualTo(checkpointId);
+
+ Map<Snapshot, String> tags = table.tagManager().tags();
+ assertThat(tags).containsOnlyKeys(snapshot);
+ assertThat(tags.get(snapshot))
+ .isEqualTo(
+
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table);
+ testHarness.open();
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ long checkpointId = 1L, timestamp = 1L;
+ processCommittable(testHarness, write, checkpointId, timestamp,
GenericRow.of(1, 10L));
+
+ // trigger savepoint but not notified
+ OperatorSubtaskState subtaskState =
+ testHarness
+ .snapshotWithLocalState(
+ checkpointId,
+ timestamp,
+
SavepointType.savepoint(SavepointFormatType.CANONICAL))
+ .getJobManagerOwnedState();
+ assertThat(table.snapshotManager().latestSnapshot()).isNull();
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+
+ testHarness = createRecoverableTestHarness(table);
+ try {
+ // commit snapshot from state, fail intentionally
+ testHarness.initializeState(subtaskState);
+ testHarness.open();
+ fail("Expecting intentional exception");
+ } catch (Exception e) {
+ assertThat(e)
+ .hasMessageContaining(
+ "This exception is intentionally thrown "
+ + "after committing the restored
checkpoints. "
+ + "By restarting the job we hope that "
+ + "writers can start writing based on
these new commits.");
+ }
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.id()).isEqualTo(checkpointId);
+
+ Map<Snapshot, String> tags = table.tagManager().tags();
+ assertThat(tags).containsOnlyKeys(snapshot);
+ assertThat(tags.get(snapshot))
+ .isEqualTo(
+
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
+ }
+
+ private void processCommittable(
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness,
+ StreamTableWrite write,
+ long checkpointId,
+ long timestamp,
+ InternalRow... rows)
+ throws Exception {
+ for (InternalRow row : rows) {
+ write.write(row);
+ }
+ for (CommitMessage committable : write.prepareCommit(false,
checkpointId)) {
+ testHarness.processElement(
+ new Committable(checkpointId, Committable.Kind.FILE,
committable), timestamp);
+ }
+ }
+
+ @Override
+ protected OneInputStreamOperator<Committable, Committable>
createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager<ManifestCommittable>
committableStateManager) {
+ return new AutoTagForSavepointCommitterOperator<>(
+ (CommitterOperator<Committable, ManifestCommittable>)
+ super.createCommitterOperator(table, commitUser,
committableStateManager),
+ table::snapshotManager,
+ table::tagManager);
+ }
+
+ @Override
+ protected OneInputStreamOperator<Committable, Committable>
createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager<ManifestCommittable>
committableStateManager,
+ ThrowingConsumer<StateInitializationContext, Exception>
initializeFunction) {
+ return new AutoTagForSavepointCommitterOperator<>(
+ (CommitterOperator<Committable, ManifestCommittable>)
+ super.createCommitterOperator(
+ table, commitUser, committableStateManager,
initializeFunction),
+ table::snapshotManager,
+ table::tagManager);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 74d8561bd..ea089b34a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.ThrowingConsumer;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -39,6 +40,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -57,7 +59,7 @@ import static org.assertj.core.api.Assertions.fail;
/** Tests for {@link CommitterOperator}. */
public class CommitterOperatorTest extends CommitterOperatorTestBase {
- private String initialCommitUser;
+ protected String initialCommitUser;
@BeforeEach
public void before() {
@@ -241,27 +243,19 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
// 3. Check whether success
List<String> actual = new ArrayList<>();
- CommitterOperator<Committable, ManifestCommittable> operator =
- new CommitterOperator<Committable, ManifestCommittable>(
- true,
+ OneInputStreamOperator<Committable, Committable> operator =
+ createCommitterOperator(
+ table,
initialCommitUser,
- user ->
- new StoreCommitter(
- table.newStreamWriteBuilder()
- .withCommitUser(user)
- .newCommit()),
- new NoopCommittableStateManager()) {
- @Override
- public void initializeState(StateInitializationContext
context)
- throws Exception {
- ListState<String> state =
- context.getOperatorStateStore()
- .getUnionListState(
- new ListStateDescriptor<>(
- "commit_user_state",
String.class));
- state.get().forEach(actual::add);
- }
- };
+ new NoopCommittableStateManager(),
+ context -> {
+ ListState<String> state =
+ context.getOperatorStateStore()
+ .getUnionListState(
+ new ListStateDescriptor<>(
+
"commit_user_state", String.class));
+ state.get().forEach(actual::add);
+ });
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness1 =
createTestHarness(operator);
@@ -354,17 +348,12 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
// Test utils
// ------------------------------------------------------------------------
- private OneInputStreamOperatorTestHarness<Committable, Committable>
+ protected OneInputStreamOperatorTestHarness<Committable, Committable>
createRecoverableTestHarness(FileStoreTable table) throws
Exception {
- CommitterOperator<Committable, ManifestCommittable> operator =
- new CommitterOperator<>(
- true,
- initialCommitUser,
- user ->
- new StoreCommitter(
- table.newStreamWriteBuilder()
- .withCommitUser(user)
- .newCommit()),
+ OneInputStreamOperator<Committable, Committable> operator =
+ createCommitterOperator(
+ table,
+ null,
new RestoreAndFailCommittableStateManager<>(
() ->
new VersionedSerializerWrapper<>(
@@ -379,21 +368,13 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
private OneInputStreamOperatorTestHarness<Committable, Committable>
createLossyTestHarness(
FileStoreTable table, String commitUser) throws Exception {
- CommitterOperator<Committable, ManifestCommittable> operator =
- new CommitterOperator<>(
- true,
- commitUser == null ? initialCommitUser : commitUser,
- user ->
- new StoreCommitter(
- table.newStreamWriteBuilder()
- .withCommitUser(user)
- .newCommit()),
- new NoopCommittableStateManager());
+ OneInputStreamOperator<Committable, Committable> operator =
+ createCommitterOperator(table, commitUser, new
NoopCommittableStateManager());
return createTestHarness(operator);
}
private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
- CommitterOperator<Committable, ManifestCommittable> operator)
throws Exception {
+ OneInputStreamOperator<Committable, Committable> operator) throws
Exception {
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
OneInputStreamOperatorTestHarness<Committable, Committable> harness =
@@ -401,4 +382,36 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
harness.setup(serializer);
return harness;
}
+
+ protected OneInputStreamOperator<Committable, Committable>
createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager<ManifestCommittable>
committableStateManager) {
+ return new CommitterOperator<>(
+ true,
+ commitUser == null ? initialCommitUser : commitUser,
+ user ->
+ new StoreCommitter(
+
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+ committableStateManager);
+ }
+
+ protected OneInputStreamOperator<Committable, Committable>
createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager<ManifestCommittable>
committableStateManager,
+ ThrowingConsumer<StateInitializationContext, Exception>
initializeFunction) {
+ return new CommitterOperator<Committable, ManifestCommittable>(
+ true,
+ commitUser == null ? initialCommitUser : commitUser,
+ user ->
+ new StoreCommitter(
+
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+ committableStateManager) {
+ @Override
+ public void initializeState(StateInitializationContext context)
throws Exception {
+ initializeFunction.accept(context);
+ }
+ };
+ }
}