This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e3edd69e45 [core] Introduce RollbackToWatermarkProcedure for rollback
(#4687)
e3edd69e45 is described below
commit e3edd69e45434facf2f84b978f60018a69a335d9
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Dec 11 21:48:22 2024 +0800
[core] Introduce RollbackToWatermarkProcedure for rollback (#4687)
---
docs/content/flink/procedures.md | 22 +++++
docs/content/spark/procedures.md | 11 +++
.../org/apache/paimon/utils/SnapshotManager.java | 59 ++++++++++++
.../procedure/RollbackToWatermarkProcedure.java | 59 ++++++++++++
.../procedure/RollbackToWatermarkProcedure.java | 66 +++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../RollbackToWatermarkProcedureITCase.java | 79 ++++++++++++++++
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../procedure/RollbackToWatermarkProcedure.java | 105 +++++++++++++++++++++
9 files changed, 404 insertions(+)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 59b02f82bf..7a9b238073 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -412,6 +412,28 @@ All available procedures are listed below.
CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp =>
1730292023000)
</td>
</tr>
+ <tr>
+ <td>rollback_to_watermark</td>
+ <td>
+ -- for Flink 1.18<br/>
+ -- rollback to the snapshot which earlier or equal than
watermark.<br/>
+ CALL sys.rollback_to_watermark('identifier', watermark)<br/><br/>
+ -- for Flink 1.19 and later<br/>
+ -- rollback to the snapshot which earlier or equal than
watermark.<br/>
+ CALL sys.rollback_to_watermark(`table` => 'default.T', `watermark` =>
watermark)<br/><br/>
+ </td>
+ <td>
+ To rollback to the snapshot which earlier or equal than watermark.
Argument:
+ <li>identifier: the target table identifier. Cannot be empty.</li>
+ <li>watermark (Long): Roll back to the snapshot which earlier or
equal than watermark.</li>
+ </td>
+ <td>
+ -- for Flink 1.18<br/>
+ CALL sys.rollback_to_watermark('default.T', 1730292023000)
+ -- for Flink 1.19 and later<br/>
+ CALL sys.rollback_to_watermark(`table` => 'default.T', watermark =>
1730292023000)
+ </td>
+ </tr>
<tr>
<td>expire_snapshots</td>
<td>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 88d46fabbb..5b0efd5f90 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -179,6 +179,17 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp =>
1730292023000)<br/><br/>
</td>
</tr>
+ <tr>
+ <td>rollback_to_watermark</td>
+ <td>
+ To rollback to the snapshot which earlier or equal than watermark.
Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>watermark: roll back to the snapshot which earlier or equal
than watermark.</li>
+ </td>
+ <td>
+ CALL sys.rollback_to_watermark(table => 'default.T', watermark =>
1730292023000)<br/><br/>
+ </td>
+ </tr>
<tr>
<td>migrate_database</td>
<td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index cbe33ffaf4..eb7333366f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -366,6 +366,65 @@ public class SnapshotManager implements Serializable {
return finalSnapshot;
}
+ public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
+ Long earliest = earliestSnapshotId();
+ Long latest = latestSnapshotId();
+ // If latest == Long.MIN_VALUE don't need next binary search for
watermark
+ // which can reduce IO cost with snapshot
+ if (earliest == null || latest == null || snapshot(latest).watermark()
== Long.MIN_VALUE) {
+ return null;
+ }
+ Long earliestWatermark = null;
+ // find the first snapshot with watermark
+ if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+ while (earliest < latest) {
+ earliest++;
+ earliestWatermark = snapshot(earliest).watermark();
+ if (earliestWatermark != null) {
+ break;
+ }
+ }
+ }
+ if (earliestWatermark == null) {
+ return null;
+ }
+
+ if (earliestWatermark >= watermark) {
+ return snapshot(earliest);
+ }
+ Snapshot finalSnapshot = null;
+
+ while (earliest <= latest) {
+ long mid = earliest + (latest - earliest) / 2; // Avoid overflow
+ Snapshot snapshot = snapshot(mid);
+ Long commitWatermark = snapshot.watermark();
+ if (commitWatermark == null) {
+ // find the first snapshot with watermark
+ while (mid >= earliest) {
+ mid--;
+ commitWatermark = snapshot(mid).watermark();
+ if (commitWatermark != null) {
+ break;
+ }
+ }
+ }
+ if (commitWatermark == null) {
+ earliest = mid + 1;
+ } else {
+ if (commitWatermark > watermark) {
+ latest = mid - 1; // Search in the left half
+ } else if (commitWatermark < watermark) {
+ earliest = mid + 1; // Search in the right half
+ finalSnapshot = snapshot;
+ } else {
+ finalSnapshot = snapshot; // Found the exact match
+ break;
+ }
+ }
+ }
+ return finalSnapshot;
+ }
+
public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..da0b38f16b
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Rollback to watermark procedure. Usage:
+ *
+ * <pre><code>
+ * -- rollback to the snapshot which earlier or equal than watermark.
+ * CALL sys.rollback_to_watermark('tableId', watermark)
+ * </code></pre>
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "rollback_to_watermark";
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
long watermark)
+ throws Catalog.TableNotExistException {
+ Preconditions.checkNotNull(tableId, "table can not be empty");
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Snapshot snapshot =
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ Preconditions.checkNotNull(
+ snapshot, String.format("count not find snapshot earlier than
%s", watermark));
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s
.", snapshotId)};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..ab1ea8080d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Rollback to watermark procedure. Usage:
+ *
+ * <pre><code>
+ * -- rollback to the snapshot which earlier or equal than watermark.
+ * CALL sys.rollback_to_watermark(`table` => 'tableId', watermark =>
watermark)
+ * </code></pre>
+ */
+public class RollbackToWatermarkProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "rollback_to_watermark";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "watermark", type =
@DataTypeHint("BIGINT"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId,
Long watermark)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Snapshot snapshot =
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ Preconditions.checkNotNull(
+ snapshot, String.format("count not find snapshot earlier than
%s", watermark));
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s
.", snapshotId)};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 0ff3ac1f1e..6c3b0e7664 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -61,6 +61,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
+org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
new file mode 100644
index 0000000000..f87ecd2475
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedureITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link RollbackToWatermarkProcedure}. */
+public class RollbackToWatermarkProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testCreateTagsFromSnapshotsWatermark() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+
+ // create snapshot 1 with watermark 1000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '1000') */
values('k1', '2024-12-02')");
+ // create snapshot 2 with watermark 2000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '2000') */
values('k2', '2024-12-02')");
+ // create snapshot 3 with watermark 3000.
+ sql(
+ "insert into T/*+ OPTIONS('end-input.watermark'= '3000') */
values('k3', '2024-12-02')");
+
+ FileStoreTable table = paimonTable("T");
+
+ long watermark1 = table.snapshotManager().snapshot(1).watermark();
+ long watermark2 = table.snapshotManager().snapshot(2).watermark();
+ long watermark3 = table.snapshotManager().snapshot(3).watermark();
+
+ assertThat(watermark1 == 1000).isTrue();
+ assertThat(watermark2 == 2000).isTrue();
+ assertThat(watermark3 == 3000).isTrue();
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k1, 2024-12-02]", "+I[k2, 2024-12-02]", "+I[k3,
2024-12-02]");
+
+ sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark`
=> 2001)");
+
+ // check for snapshot 2
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-12-02]", "+I[k2,
2024-12-02]");
+
+ sql("CALL sys.rollback_to_watermark(`table` => 'default.T',`watermark`
=> 1001)");
+
+ // check for snapshot 1
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-12-02]");
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 21f14e5d7a..b2fa66a150 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -43,6 +43,7 @@ import org.apache.paimon.spark.procedure.ReplaceTagProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;
+import org.apache.paimon.spark.procedure.RollbackToWatermarkProcedure;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -72,6 +73,7 @@ public class SparkProcedures {
ImmutableMap.builder();
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("rollback_to_timestamp",
RollbackToTimestampProcedure::builder);
+ procedureBuilders.put("rollback_to_watermark",
RollbackToWatermarkProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
new file mode 100644
index 0000000000..09185f02c9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to rollback to a watermark. */
+public class RollbackToWatermarkProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ // watermark value
+ ProcedureParameter.required("watermark", LongType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", StringType, true,
Metadata.empty())
+ });
+
+ private RollbackToWatermarkProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ Long watermark = args.getLong(1);
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Snapshot snapshot =
+
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ Preconditions.checkNotNull(
+ snapshot,
+ String.format("count not find snapshot earlier
than %s", watermark));
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ InternalRow outputRow =
+ newInternalRow(
+ UTF8String.fromString(
+ String.format(
+ "Success roll back to
snapshot: %s .",
+ snapshotId)));
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<RollbackToWatermarkProcedure>() {
+ @Override
+ public RollbackToWatermarkProcedure doBuild() {
+ return new RollbackToWatermarkProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "RollbackToWatermarkProcedure";
+ }
+}