This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e4b1200b [spark] Support specifying time-pattern in ExpairePartition 
procedure (#3899)
0e4b1200b is described below

commit 0e4b1200bf5049a519fe0362466c60b30461cd79
Author: bknbkn <[email protected]>
AuthorDate: Mon Aug 5 18:52:13 2024 +0800

    [spark] Support specifying time-pattern in ExpairePartition procedure 
(#3899)
---
 docs/content/spark/procedures.md                   |  4 +-
 .../spark/procedure/ExpirePartitionsProcedure.java |  5 +-
 .../procedure/ExpirePartitionsProcedureTest.scala  | 70 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 2 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 3f9b3de85..c3b6292c7 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -73,9 +73,11 @@ This section introduce all available spark procedures about 
paimon.
             <li>table: the target table identifier. Cannot be empty.</li>
             <li>expiration_time: the expiration interval of a partition. A 
partition will be expired if it‘s lifetime is over this value. Partition time 
is extracted from the partition value.</li>
             <li>timestamp_formatter: the formatter to format timestamp from 
string.</li>
+            <li>timestamp_pattern: the pattern to get a timestamp from 
partitions.</li>
             <li>expire_strategy: specifies the expiration strategy for 
partition expiration, possible values: 'values-time' or 'update-time' , 
'values-time' as default.</li>
       </td>
-      <td>CALL sys.expire_partitions(table => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 
'values-time')</td>
+      <td>CALL sys.expire_partitions(table => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 
+'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 
'values-time')</td>
     </tr>
     <tr>
       <td>create_tag</td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 7501a9926..81012abc9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -50,6 +50,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.required("expiration_time", StringType),
                 ProcedureParameter.optional("timestamp_formatter", StringType),
+                ProcedureParameter.optional("timestamp_pattern", StringType),
                 ProcedureParameter.optional("expire_strategy", StringType)
             };
 
@@ -78,7 +79,8 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String expirationTime = args.getString(1);
         String timestampFormatter = args.isNullAt(2) ? null : 
args.getString(2);
-        String expireStrategy = args.isNullAt(3) ? null : args.getString(3);
+        String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
+        String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
@@ -87,6 +89,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
                     Map<String, String> map = new HashMap<>();
                     map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
                     map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
+                    map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
                     PartitionExpire partitionExpire =
                             new PartitionExpire(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 1872abf6c..db4696047 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -404,4 +404,74 @@ class ExpirePartitionsProcedureTest extends 
PaimonSparkTestBase with StreamTest
       }
     }
   }
+
+  test("Paimon procedure : expire partitions with specified time-pattern 
partitions.") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (k STRING, pt STRING, hm STRING)
+                       |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1')
+                       | PARTITIONED BY (hm, pt)
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(String, String, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("k", "pt", "hm")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            // Show results : There are no expired partitions.
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T', 
expiration_time => '1 d'" +
+                  ", timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => 
'$pt')"),
+              Row("No expired partitions.") :: Nil
+            )
+
+            // snapshot-1
+            inputData.addData(("a", "2024-06-01", "01:00"))
+            stream.processAllAvailable()
+            // snapshot-2
+            inputData.addData(("b", "2024-06-02", "02:00"))
+            stream.processAllAvailable()
+            // snapshot-3, never expires.
+            inputData.addData(("Never-expire", "9999-09-09", "99:99"))
+            stream.processAllAvailable()
+
+            checkAnswer(
+              query(),
+              Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02", 
"02:00") :: Row(
+                "Never-expire",
+                "9999-09-09",
+                "99:99") :: Nil)
+
+            // Show a list of expired partitions.
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T'" +
+                  ", expiration_time => '1 d'" +
+                  ", timestamp_formatter => 'yyyy-MM-dd HH:mm'" +
+                  ", timestamp_pattern => '$pt $hm')"),
+              Row("hm=01:00, pt=2024-06-01") :: Row("hm=02:00, pt=2024-06-02") 
:: Nil
+            )
+
+            checkAnswer(query(), Row("Never-expire", "9999-09-09", "99:99") :: 
Nil)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }

Reply via email to