This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0b7e97a8 [spark] Adjust ExpireSnapshotsProcedure param type (#4059)
d0b7e97a8 is described below

commit d0b7e97a820cdba05c9cd921d108b5748682edd7
Author: askwang <[email protected]>
AuthorDate: Tue Sep 24 09:57:13 2024 +0800

    [spark] Adjust ExpireSnapshotsProcedure param type (#4059)
---
 .../spark/procedure/ExpireSnapshotsProcedure.java  | 15 ++++++++---
 .../procedure/ExpireSnapshotsProcedureTest.scala   | 29 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
index 4716f6add..f24f18067 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -29,10 +31,10 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import java.time.Duration;
+import java.util.TimeZone;
 
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
-import static org.apache.spark.sql.types.DataTypes.TimestampType;
 
 /** A procedure to expire snapshots. */
 public class ExpireSnapshotsProcedure extends BaseProcedure {
@@ -42,7 +44,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.optional("retain_max", IntegerType),
                 ProcedureParameter.optional("retain_min", IntegerType),
-                ProcedureParameter.optional("older_than", TimestampType),
+                ProcedureParameter.optional("older_than", StringType),
                 ProcedureParameter.optional("max_deletes", IntegerType)
             };
 
@@ -72,8 +74,9 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         Integer retainMax = args.isNullAt(1) ? null : args.getInt(1);
         Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
-        Long olderThanMills = args.isNullAt(3) ? null : args.getLong(3) / 1000;
+        String olderThanStr = args.isNullAt(3) ? null : args.getString(3);
         Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
@@ -85,7 +88,11 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
                     if (retainMin != null) {
                         builder.snapshotRetainMin(retainMin);
                     }
-                    if (olderThanMills != null) {
+                    if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
+                        long olderThanMills =
+                                DateTimeUtils.parseTimestampData(
+                                                olderThanStr, 3, 
TimeZone.getDefault())
+                                        .getMillisecond();
                         builder.snapshotTimeRetain(
                                 Duration.ofMillis(System.currentTimeMillis() - 
olderThanMills));
                     }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index 6fcb48d68..da7be4231 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -19,10 +19,14 @@
 package org.apache.paimon.spark.procedure
 
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.SnapshotManager
 
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming.StreamTest
+import org.assertj.core.api.Assertions.{assertThat, 
assertThatIllegalArgumentException}
+
+import java.sql.Timestamp
 
 class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest 
{
 
@@ -136,4 +140,29 @@ class ExpireSnapshotsProcedureTest extends 
PaimonSparkTestBase with StreamTest {
       }
     }
   }
+
+  test("Paimon Procedure: test parameter order_than with string type") {
+    sql(
+      "CREATE TABLE T (a INT, b STRING) " +
+        "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999' )")
+    val table = loadTable("T")
+    val snapshotManager = table.snapshotManager
+
+    // generate 5 snapshot
+    for (i <- 1 to 5) {
+      sql(s"INSERT INTO T VALUES ($i, '$i')")
+    }
+    checkSnapshots(snapshotManager, 1, 5)
+
+    val timestamp = new Timestamp(snapshotManager.latestSnapshot().timeMillis)
+    spark.sql(
+      s"CALL paimon.sys.expire_snapshots(table => 'test.T', older_than => 
'${timestamp.toString}', max_deletes => 2)")
+    checkSnapshots(snapshotManager, 3, 5)
+  }
+
+  def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
+    assertThat(sm.snapshotCount).isEqualTo(latest - earliest + 1)
+    assertThat(sm.earliestSnapshotId).isEqualTo(earliest)
+    assertThat(sm.latestSnapshotId).isEqualTo(latest)
+  }
 }

Reply via email to