This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0e4b1200b [spark] Support specifying time-pattern in ExpairePartition
procedure (#3899)
0e4b1200b is described below
commit 0e4b1200bf5049a519fe0362466c60b30461cd79
Author: bknbkn <[email protected]>
AuthorDate: Mon Aug 5 18:52:13 2024 +0800
[spark] Support specifying time-pattern in ExpairePartition procedure
(#3899)
---
docs/content/spark/procedures.md | 4 +-
.../spark/procedure/ExpirePartitionsProcedure.java | 5 +-
.../procedure/ExpirePartitionsProcedureTest.scala | 70 ++++++++++++++++++++++
3 files changed, 77 insertions(+), 2 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 3f9b3de85..c3b6292c7 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -73,9 +73,11 @@ This section introduce all available spark procedures about
paimon.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A
partition will be expired if it‘s lifetime is over this value. Partition time
is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from
string.</li>
+ <li>timestamp_pattern: the pattern to get a timestamp from
partitions.</li>
<li>expire_strategy: specifies the expiration strategy for
partition expiration, possible values: 'values-time' or 'update-time' ,
'values-time' as default.</li>
</td>
- <td>CALL sys.expire_partitions(table => 'default.T', expiration_time =>
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy =>
'values-time')</td>
+ <td>CALL sys.expire_partitions(table => 'default.T', expiration_time =>
'1 d', timestamp_formatter =>
+'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy =>
'values-time')</td>
</tr>
<tr>
<td>create_tag</td>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 7501a9926..81012abc9 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -50,6 +50,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("expiration_time", StringType),
ProcedureParameter.optional("timestamp_formatter", StringType),
+ ProcedureParameter.optional("timestamp_pattern", StringType),
ProcedureParameter.optional("expire_strategy", StringType)
};
@@ -78,7 +79,8 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
String expirationTime = args.getString(1);
String timestampFormatter = args.isNullAt(2) ? null :
args.getString(2);
- String expireStrategy = args.isNullAt(3) ? null : args.getString(3);
+ String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
+ String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
return modifyPaimonTable(
tableIdent,
table -> {
@@ -87,6 +89,7 @@ public class ExpirePartitionsProcedure extends BaseProcedure {
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
new PartitionExpire(
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 1872abf6c..db4696047 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -404,4 +404,74 @@ class ExpirePartitionsProcedureTest extends
PaimonSparkTestBase with StreamTest
}
}
}
+
+ test("Paimon procedure : expire partitions with specified time-pattern
partitions.") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (k STRING, pt STRING, hm STRING)
+ |TBLPROPERTIES ('primary-key'='k,pt,hm', 'bucket'='1')
+ | PARTITIONED BY (hm, pt)
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(String, String, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("k", "pt", "hm")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T")
+
+ try {
+ // Show results : There are no expired partitions.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T',
expiration_time => '1 d'" +
+ ", timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern =>
'$pt')"),
+ Row("No expired partitions.") :: Nil
+ )
+
+ // snapshot-1
+ inputData.addData(("a", "2024-06-01", "01:00"))
+ stream.processAllAvailable()
+ // snapshot-2
+ inputData.addData(("b", "2024-06-02", "02:00"))
+ stream.processAllAvailable()
+ // snapshot-3, never expires.
+ inputData.addData(("Never-expire", "9999-09-09", "99:99"))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row("a", "2024-06-01", "01:00") :: Row("b", "2024-06-02",
"02:00") :: Row(
+ "Never-expire",
+ "9999-09-09",
+ "99:99") :: Nil)
+
+ // Show a list of expired partitions.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T'" +
+ ", expiration_time => '1 d'" +
+ ", timestamp_formatter => 'yyyy-MM-dd HH:mm'" +
+ ", timestamp_pattern => '$pt $hm')"),
+ Row("hm=01:00, pt=2024-06-01") :: Row("hm=02:00, pt=2024-06-02")
:: Nil
+ )
+
+ checkAnswer(query(), Row("Never-expire", "9999-09-09", "99:99") ::
Nil)
+
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}