This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d0b7e97a8 [spark] Adjust ExpireSnapshotsProcedure param type (#4059)
d0b7e97a8 is described below
commit d0b7e97a820cdba05c9cd921d108b5748682edd7
Author: askwang <[email protected]>
AuthorDate: Tue Sep 24 09:57:13 2024 +0800
[spark] Adjust ExpireSnapshotsProcedure param type (#4059)
---
.../spark/procedure/ExpireSnapshotsProcedure.java | 15 ++++++++---
.../procedure/ExpireSnapshotsProcedureTest.scala | 29 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
index 4716f6add..f24f18067 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -20,6 +20,8 @@ package org.apache.paimon.spark.procedure;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.table.ExpireSnapshots;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.StringUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -29,10 +31,10 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.time.Duration;
+import java.util.TimeZone;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
-import static org.apache.spark.sql.types.DataTypes.TimestampType;
/** A procedure to expire snapshots. */
public class ExpireSnapshotsProcedure extends BaseProcedure {
@@ -42,7 +44,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("retain_max", IntegerType),
ProcedureParameter.optional("retain_min", IntegerType),
- ProcedureParameter.optional("older_than", TimestampType),
+ ProcedureParameter.optional("older_than", StringType),
ProcedureParameter.optional("max_deletes", IntegerType)
};
@@ -72,8 +74,9 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Integer retainMax = args.isNullAt(1) ? null : args.getInt(1);
Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
- Long olderThanMills = args.isNullAt(3) ? null : args.getLong(3) / 1000;
+ String olderThanStr = args.isNullAt(3) ? null : args.getString(3);
Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+
return modifyPaimonTable(
tableIdent,
table -> {
@@ -85,7 +88,11 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
if (retainMin != null) {
builder.snapshotRetainMin(retainMin);
}
- if (olderThanMills != null) {
+ if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
+ long olderThanMills =
+ DateTimeUtils.parseTimestampData(
+ olderThanStr, 3,
TimeZone.getDefault())
+ .getMillisecond();
builder.snapshotTimeRetain(
Duration.ofMillis(System.currentTimeMillis() -
olderThanMills));
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index 6fcb48d68..da7be4231 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -19,10 +19,14 @@
package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.SnapshotManager
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
+import org.assertj.core.api.Assertions.{assertThat,
assertThatIllegalArgumentException}
+
+import java.sql.Timestamp
class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest
{
@@ -136,4 +140,29 @@ class ExpireSnapshotsProcedureTest extends
PaimonSparkTestBase with StreamTest {
}
}
}
+
+ test("Paimon Procedure: test parameter order_than with string type") {
+ sql(
+ "CREATE TABLE T (a INT, b STRING) " +
+ "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999' )")
+ val table = loadTable("T")
+ val snapshotManager = table.snapshotManager
+
+ // generate 5 snapshot
+ for (i <- 1 to 5) {
+ sql(s"INSERT INTO T VALUES ($i, '$i')")
+ }
+ checkSnapshots(snapshotManager, 1, 5)
+
+ val timestamp = new Timestamp(snapshotManager.latestSnapshot().timeMillis)
+ spark.sql(
+ s"CALL paimon.sys.expire_snapshots(table => 'test.T', older_than =>
'${timestamp.toString}', max_deletes => 2)")
+ checkSnapshots(snapshotManager, 3, 5)
+ }
+
+ def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
+ assertThat(sm.snapshotCount).isEqualTo(latest - earliest + 1)
+ assertThat(sm.earliestSnapshotId).isEqualTo(earliest)
+ assertThat(sm.latestSnapshotId).isEqualTo(latest)
+ }
}