This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21e6329d16 [procedure] Enhance return result for rollback procedure 
(#5533)
21e6329d16 is described below

commit 21e6329d1640a7eeabac973895693780afbd9941
Author: askwang <[email protected]>
AuthorDate: Fri Apr 25 13:19:56 2025 +0800

    [procedure] Enhance return result for rollback procedure (#5533)
---
 .../flink/procedure/RollbackToProcedure.java       | 17 ++++-
 .../procedure/RollbackToTimestampProcedure.java    | 13 +++-
 .../flink/procedure/RollbackProcedureITCase.java   | 88 ++++++++++++++++++++++
 .../paimon/spark/sql/RollbackProcedureTest.scala   |  5 +-
 .../paimon/spark/procedure/RollbackProcedure.java  | 22 +++++-
 .../procedure/RollbackToTimestampProcedure.java    | 28 ++++---
 .../procedure/RollbackToWatermarkProcedure.java    | 28 ++++---
 .../spark/procedure/RollbackProcedureTest.scala    | 16 ++--
 8 files changed, 181 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index 9bca6505c9..8320453f8e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -18,15 +18,20 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
 
 /**
  * Rollback procedure. Usage:
@@ -52,16 +57,24 @@ public class RollbackToProcedure extends ProcedureBase {
                         type = @DataTypeHint("BIGINT"),
                         isOptional = true)
             })
-    public String[] call(
+    public @DataTypeHint("ROW<previous_snapshot_id BIGINT, current_snapshot_id 
BIGINT>") Row[] call(
             ProcedureContext procedureContext, String tableId, String tagName, 
Long snapshotId)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
+
+        FileStore<?> store = ((FileStoreTable) table).store();
+        Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+        Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, 
can not rollback.");
+
+        long rollbackSnapshotId;
         if (!StringUtils.isNullOrWhitespaceOnly(tagName)) {
             table.rollbackTo(tagName);
+            rollbackSnapshotId = 
store.newTagManager().getOrThrow(tagName).trimToSnapshot().id();
         } else {
             table.rollbackTo(snapshotId);
+            rollbackSnapshotId = snapshotId;
         }
-        return new String[] {"Success"};
+        return new Row[] {Row.of(latestSnapshot.id(), rollbackSnapshotId)};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
index f84dab8eab..a0b212a42d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
@@ -24,11 +24,13 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
 
 /**
  * Rollback to timestamp procedure. Usage:
@@ -47,16 +49,21 @@ public class RollbackToTimestampProcedure extends 
ProcedureBase {
                 @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
                 @ArgumentHint(name = "timestamp", type = 
@DataTypeHint("BIGINT"))
             })
-    public String[] call(ProcedureContext procedureContext, String tableId, 
Long timestamp)
+    public @DataTypeHint("ROW<previous_snapshot_id BIGINT, current_snapshot_id 
BIGINT>") Row[] call(
+            ProcedureContext procedureContext, String tableId, Long timestamp)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        Snapshot snapshot = 
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+        Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null, 
can not rollback.");
+
+        Snapshot snapshot = snapshotManager.earlierOrEqualTimeMills(timestamp);
         Preconditions.checkNotNull(
                 snapshot, String.format("count not find snapshot earlier than 
%s", timestamp));
         long snapshotId = snapshot.id();
         fileStoreTable.rollbackTo(snapshotId);
-        return new String[] {String.format("Success roll back to snapshot: %s 
.", snapshotId)};
+        return new Row[] {Row.of(latestSnapshot.id(), snapshotId)};
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
new file mode 100644
index 0000000000..884237a16f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** IT Case for {@link RollbackToProcedure} and {@link 
RollbackToTimestampProcedure}. */
+public class RollbackProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testRollbackTo() throws Exception {
+        sql(
+                "CREATE TABLE T (id STRING, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1', 'write-only'='true')");
+
+        FileStoreTable table = paimonTable("T");
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        for (int i = 1; i <= 5; i++) {
+            sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");
+        }
+        assertEquals(5, snapshotManager.latestSnapshotId());
+
+        sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-2', 
snapshot_id => 2)");
+
+        // rollback to snapshot_id
+        long latestSnapshotId = snapshotManager.latestSnapshot().id();
+        assertThat(sql("CALL sys.rollback_to(`table` => 'default.T', 
snapshot_id => 4)"))
+                .containsExactly(Row.of(latestSnapshotId, 4L));
+
+        // rollback to tag
+        latestSnapshotId = snapshotManager.latestSnapshot().id();
+        assertThat(sql("CALL sys.rollback_to(`table` => 'default.T', tag => 
'tag-2')"))
+                .containsExactly(Row.of(latestSnapshotId, 2L));
+    }
+
+    @Test
+    public void testRollbackToTimestamp() throws Exception {
+        sql(
+                "CREATE TABLE T (id STRING, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1', 'write-only'='true')");
+
+        FileStoreTable table = paimonTable("T");
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        sql("INSERT INTO T VALUES ('1', 'a')");
+        sql("INSERT INTO T VALUES ('1', 'b')");
+        long timestamp = System.currentTimeMillis();
+
+        sql("INSERT INTO T VALUES ('3', 'c')");
+        assertEquals(3, snapshotManager.latestSnapshotId());
+
+        // rollback to timestamp
+        long latestSnapshotId = snapshotManager.latestSnapshot().id();
+        assertThat(
+                        sql(
+                                String.format(
+                                        "CALL 
sys.rollback_to_timestamp(`table` => 'default.T', `timestamp` => %s)",
+                                        timestamp)))
+                .containsExactly(Row.of(latestSnapshotId, 2L));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
index 7a3a5730ed..605b4cadf6 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -51,6 +51,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with 
StreamTest {
             }
             .start()
 
+          val table = loadTable("T")
           val query = () => spark.sql("SELECT * FROM T ORDER BY a")
 
           try {
@@ -79,13 +80,13 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
             // rollback to snapshot
             checkAnswer(
               spark.sql("CALL paimon.sys.rollback(table => 'test.T', version 
=> '2')"),
-              Row(true) :: Nil)
+              Row(table.latestSnapshot().get().id, 2) :: Nil)
             checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
 
             // rollback to tag
             checkAnswer(
               spark.sql("CALL paimon.sys.rollback(table => 'test.T', version 
=> 'test_tag')"),
-              Row(true) :: Nil)
+              Row(table.latestSnapshot().get().id, 1) :: Nil)
             checkAnswer(query(), Row(1, "a") :: Nil)
           } finally {
             stream.stop()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
index d9a8876332..61060554dc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.spark.procedure;
 
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
@@ -47,7 +50,13 @@ public class RollbackProcedure extends BaseProcedure {
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                        new StructField(
+                                "previous_snapshot_id",
+                                DataTypes.LongType,
+                                false,
+                                Metadata.empty()),
+                        new StructField(
+                                "current_snapshot_id", DataTypes.LongType, 
false, Metadata.empty())
                     });
 
     private RollbackProcedure(TableCatalog tableCatalog) {
@@ -92,12 +101,21 @@ public class RollbackProcedure extends BaseProcedure {
                         tag = args.isNullAt(3) ? null : args.getString(3);
                     }
 
+                    FileStore<?> store = ((FileStoreTable) table).store();
+                    Snapshot latestSnapshot = 
store.snapshotManager().latestSnapshot();
+                    Preconditions.checkNotNull(
+                            latestSnapshot, "Latest snapshot is null, can not 
rollback.");
+
+                    long currentSnapshotId;
                     if (snapshot != null) {
                         table.rollbackTo(snapshot);
+                        currentSnapshotId = snapshot;
                     } else {
                         table.rollbackTo(tag);
+                        currentSnapshotId =
+                                
store.newTagManager().getOrThrow(tag).trimToSnapshot().id();
                     }
-                    InternalRow outputRow = newInternalRow(true);
+                    InternalRow outputRow = 
newInternalRow(latestSnapshot.id(), currentSnapshotId);
                     return new InternalRow[] {outputRow};
                 });
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
index a01f08b3fc..d0dae70484 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
@@ -21,14 +21,15 @@ package org.apache.paimon.spark.procedure;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
 
 import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -46,7 +47,13 @@ public class RollbackToTimestampProcedure extends 
BaseProcedure {
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", StringType, true, 
Metadata.empty())
+                        new StructField(
+                                "previous_snapshot_id",
+                                DataTypes.LongType,
+                                false,
+                                Metadata.empty()),
+                        new StructField(
+                                "current_snapshot_id", DataTypes.LongType, 
false, Metadata.empty())
                     });
 
     private RollbackToTimestampProcedure(TableCatalog tableCatalog) {
@@ -72,19 +79,18 @@ public class RollbackToTimestampProcedure extends 
BaseProcedure {
                 tableIdent,
                 table -> {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
-                    Snapshot snapshot =
-                            
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
+                    SnapshotManager snapshotManager = 
fileStoreTable.snapshotManager();
+                    Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+                    Preconditions.checkNotNull(
+                            latestSnapshot, "Latest snapshot is null, can not 
rollback.");
+
+                    Snapshot snapshot = 
snapshotManager.earlierOrEqualTimeMills(timestamp);
                     Preconditions.checkNotNull(
                             snapshot,
-                            String.format("count not find snapshot earlier 
than %s", timestamp));
+                            String.format("Can not find snapshot earlier than 
%s", timestamp));
                     long snapshotId = snapshot.id();
                     fileStoreTable.rollbackTo(snapshotId);
-                    InternalRow outputRow =
-                            newInternalRow(
-                                    UTF8String.fromString(
-                                            String.format(
-                                                    "Success roll back to 
snapshot: %s .",
-                                                    snapshotId)));
+                    InternalRow outputRow = 
newInternalRow(latestSnapshot.id(), snapshotId);
                     return new InternalRow[] {outputRow};
                 });
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
index 09185f02c9..ac11fa2f32 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
@@ -21,14 +21,15 @@ package org.apache.paimon.spark.procedure;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
 
 import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -46,7 +47,13 @@ public class RollbackToWatermarkProcedure extends 
BaseProcedure {
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", StringType, true, 
Metadata.empty())
+                        new StructField(
+                                "previous_snapshot_id",
+                                DataTypes.LongType,
+                                false,
+                                Metadata.empty()),
+                        new StructField(
+                                "current_snapshot_id", DataTypes.LongType, 
false, Metadata.empty())
                     });
 
     private RollbackToWatermarkProcedure(TableCatalog tableCatalog) {
@@ -72,19 +79,18 @@ public class RollbackToWatermarkProcedure extends 
BaseProcedure {
                 tableIdent,
                 table -> {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
-                    Snapshot snapshot =
-                            
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+                    SnapshotManager snapshotManager = 
fileStoreTable.snapshotManager();
+                    Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+                    Preconditions.checkNotNull(
+                            latestSnapshot, "Latest snapshot is null, can not 
rollback.");
+
+                    Snapshot snapshot = 
snapshotManager.earlierOrEqualWatermark(watermark);
                     Preconditions.checkNotNull(
                             snapshot,
-                            String.format("count not find snapshot earlier 
than %s", watermark));
+                            String.format("Can not find snapshot earlier than 
%s", watermark));
                     long snapshotId = snapshot.id();
                     fileStoreTable.rollbackTo(snapshotId);
-                    InternalRow outputRow =
-                            newInternalRow(
-                                    UTF8String.fromString(
-                                            String.format(
-                                                    "Success roll back to 
snapshot: %s .",
-                                                    snapshotId)));
+                    InternalRow outputRow = 
newInternalRow(latestSnapshot.id(), snapshotId);
                     return new InternalRow[] {outputRow};
                 });
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index dde0af3d22..66f2d57e02 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -37,7 +37,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().toString
+          val table = loadTable("T")
+          val location = table.location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
@@ -79,13 +80,14 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
             // rollback to snapshot
             checkAnswer(
               spark.sql("CALL paimon.sys.rollback(table => 'test.T', version 
=> '2')"),
-              Row(true) :: Nil)
+              Row(table.latestSnapshot().get().id, 2) :: Nil)
             checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
 
             // rollback to tag
+            val taggedSnapshotId = 
table.tagManager().getOrThrow("test_tag").trimToSnapshot().id
             checkAnswer(
               spark.sql("CALL paimon.sys.rollback(table => 'test.T', version 
=> 'test_tag')"),
-              Row(true) :: Nil)
+              Row(table.latestSnapshot().get().id, taggedSnapshotId) :: Nil)
             checkAnswer(query(), Row(1, "a") :: Nil)
           } finally {
             stream.stop()
@@ -100,6 +102,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
                  |TBLPROPERTIES ('primary-key'='a', 'bucket'='3', 
'file.format'='orc')
                  |""".stripMargin)
 
+    val table = loadTable("T")
+
     val query = () => spark.sql("SELECT * FROM T ORDER BY a")
 
     // snapshot-1
@@ -128,7 +132,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
     // rollback to snapshot
     checkAnswer(
       spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"),
-      Row(true) :: Nil)
+      Row(table.latestSnapshot().get().id, 3) :: Nil)
     checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
 
     // version/snapshot/tag can only set one of them
@@ -177,6 +181,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
             }
             .start()
 
+          val table = loadTable("T")
+
           val query = () => spark.sql("SELECT * FROM T ORDER BY a")
 
           try {
@@ -201,7 +207,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase 
with StreamTest {
             checkAnswer(
               spark.sql(
                 s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', 
timestamp => $timestamp)"),
-              Row("Success roll back to snapshot: 2 .") :: Nil)
+              Row(table.latestSnapshot().get().id, 2) :: Nil)
             checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
 
           } finally {

Reply via email to