This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit d9efecf68a7ff8c735b289064234213a2a5d0f13
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Jan 22 11:06:33 2025 +0800

    [spark] Fix rollback not correctly identify tag or snapshot (#4947)
---
 docs/content/spark/procedures.md                   | 10 ++--
 .../paimon/spark/procedure/RollbackProcedure.java  | 36 +++++++++++--
 .../spark/procedure/RollbackProcedureTest.scala    | 60 ++++++++++++++++++++++
 3 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index bf7b8ae2d5..13d24d4d33 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -159,13 +159,17 @@ This section introduce all available spark procedures 
about paimon.
     <tr>
       <td>rollback</td>
       <td>
-         To rollback to a specific version of target table. Argument:
+         To rollback to a specific version of target table, note 
version/snapshot/tag must set one of them. Argument:
             <li>table: the target table identifier. Cannot be empty.</li>
-            <li>version: id of the snapshot or name of tag that will roll back 
to.</li>
+            <li>version: id of the snapshot or name of tag that will roll back 
to, version would be Deprecated.</li>
+            <li>snapshot: snapshot that will roll back to.</li>
+            <li>tag: tag that will roll back to.</li>
       </td>
       <td>
           CALL sys.rollback(table => 'default.T', version => 
'my_tag')<br/><br/>
-          CALL sys.rollback(table => 'default.T', version => 10)
+          CALL sys.rollback(table => 'default.T', version => 10)<br/><br/>
+          CALL sys.rollback(table => 'default.T', tag => 'tag1')
+          CALL sys.rollback(table => 'default.T', snapshot => 2)
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
index 6d004e9466..d9a8876332 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -26,6 +29,7 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
+import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /** A procedure to rollback to a snapshot or a tag. */
@@ -35,7 +39,9 @@ public class RollbackProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 // snapshot id or tag name
-                ProcedureParameter.required("version", StringType)
+                ProcedureParameter.optional("version", StringType),
+                ProcedureParameter.optional("snapshot", LongType),
+                ProcedureParameter.optional("tag", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -61,15 +67,35 @@ public class RollbackProcedure extends BaseProcedure {
     @Override
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        String version = args.getString(1);
+        String version = args.isNullAt(1) ? null : args.getString(1);
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
-                    if (version.chars().allMatch(Character::isDigit)) {
-                        table.rollbackTo(Long.parseLong(version));
+                    Long snapshot = null;
+                    String tag = null;
+                    if (!StringUtils.isNullOrWhitespaceOnly(version)) {
+                        Preconditions.checkState(
+                                args.isNullAt(2) && args.isNullAt(3),
+                                "only can set one of version/snapshot/tag in 
RollbackProcedure.");
+                        if (version.chars().allMatch(Character::isDigit)) {
+                            snapshot = Long.parseLong(version);
+                        } else {
+                            tag = version;
+                        }
+                    } else {
+                        Preconditions.checkState(
+                                (args.isNullAt(2) && !args.isNullAt(3)
+                                        || !args.isNullAt(2) && 
args.isNullAt(3)),
+                                "only can set one of version/snapshot/tag in 
RollbackProcedure.");
+                        snapshot = args.isNullAt(2) ? null : args.getLong(2);
+                        tag = args.isNullAt(3) ? null : args.getString(3);
+                    }
+
+                    if (snapshot != null) {
+                        table.rollbackTo(snapshot);
                     } else {
-                        table.rollbackTo(version);
+                        table.rollbackTo(tag);
                     }
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 5f5facc57a..dde0af3d22 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -94,6 +94,66 @@ class RollbackProcedureTest extends PaimonSparkTestBase with 
StreamTest {
     }
   }
 
+  test("Paimon Procedure: rollback to tag check test") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 
'file.format'='orc')
+                 |""".stripMargin)
+
+    val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+    // snapshot-1
+    spark.sql("insert into T select 1, 'a'")
+    checkAnswer(query(), Row(1, "a") :: Nil)
+
+    checkAnswer(
+      spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 
'20250122', snapshot => 1)"),
+      Row(true) :: Nil)
+
+    // snapshot-2
+    spark.sql("insert into T select 2, 'b'")
+    checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+    // snapshot-3
+    spark.sql("insert into T select 3, 'c'")
+    checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+    // snapshot-4
+    spark.sql("insert into T select 4, 'd'")
+    checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, 
"d") :: Nil)
+
+    assertThrows[RuntimeException] {
+      spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version 
=> '4')")
+    }
+    // rollback to snapshot
+    checkAnswer(
+      spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"),
+      Row(true) :: Nil)
+    checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+    // version/snapshot/tag can only set one of them
+    assertThrows[RuntimeException] {
+      spark.sql(
+        "CALL paimon.sys.rollback(table => 'test.T', version => '20250122', 
tag => '20250122')")
+    }
+
+    assertThrows[RuntimeException] {
+      spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 
'20250122', snapshot => 1)")
+    }
+
+    assertThrows[RuntimeException] {
+      spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => 
'20250122', snapshot => 1)")
+    }
+
+    // rollback to snapshot
+    spark.sql("CALL paimon.sys.rollback(table => 'test.T', snapshot => 2)")
+    checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+    // rollback to tag
+    spark.sql("CALL paimon.sys.rollback(table => 'test.T', tag => '20250122')")
+    checkAnswer(query(), Row(1, "a") :: Nil)
+  }
+
   test("Paimon Procedure: rollback to timestamp") {
     failAfter(streamingTimeout) {
       withTempDir {

Reply via email to