This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d5f14ec2 [spark] spark support expire partitions procedure. (#3540)
7d5f14ec2 is described below

commit 7d5f14ec252b82d2c58450f00d2b75919a065619
Author: HunterXHunter <[email protected]>
AuthorDate: Thu Jun 20 11:21:23 2024 +0800

    [spark] spark support expire partitions procedure. (#3540)
---
 docs/content/spark/procedures.md                   |  10 ++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/ExpirePartitionsProcedure.java | 106 +++++++++++++++++++++
 .../procedure/ExpirePartitionsProcedureTest.scala  |  82 ++++++++++++++++
 4 files changed, 200 insertions(+)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 81d829583..130c3137f 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -65,6 +65,16 @@ This section introduce all available spark procedures about 
paimon.
       </td>
       <td>CALL sys.expire_snapshots(table => 'default.T', retain_max => 
10)</td>
     </tr>
+    <tr>
+      <td>expire_partitions</td>
+      <td>
+         To expire partitions. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>expiration_time: the expiration interval of a partition. A 
partition will be expired if it‘s lifetime is over this value. Partition time 
is extracted from the partition value.</li>
+            <li>timestamp_formatter: the formatter to format timestamp from 
string.</li>
+      </td>
+      <td>CALL sys.expire_partitions(table => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd')</td>
+    </tr>
     <tr>
       <td>create_tag</td>
       <td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 98a920b32..070f2ffcb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -23,6 +23,7 @@ import 
org.apache.paimon.spark.procedure.CreateBranchProcedure;
 import org.apache.paimon.spark.procedure.CreateTagProcedure;
 import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
 import org.apache.paimon.spark.procedure.DeleteTagProcedure;
+import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
 import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
@@ -63,6 +64,7 @@ public class SparkProcedures {
         procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
         procedureBuilders.put("remove_orphan_files", 
RemoveOrphanFilesProcedure::builder);
         procedureBuilders.put("expire_snapshots", 
ExpireSnapshotsProcedure::builder);
+        procedureBuilders.put("expire_partitions", 
ExpirePartitionsProcedure::builder);
         procedureBuilders.put("repair", RepairProcedure::builder);
         return procedureBuilders.build();
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
new file mode 100644
index 000000000..b41c37181
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.time.Duration;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A procedure to expire partitions. */
+public class ExpirePartitionsProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.optional("expiration_time", StringType),
+                ProcedureParameter.optional("timestamp_formatter", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected ExpirePartitionsProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String expirationTime = args.getString(1);
+        String timestampFormatter = args.getString(2);
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    FileStoreTable fileStoreTable = (FileStoreTable) table;
+                    FileStore fileStore = fileStoreTable.store();
+                    PartitionExpire partitionExpire =
+                            new PartitionExpire(
+                                    fileStore.partitionType(),
+                                    TimeUtils.parseDuration(expirationTime),
+                                    Duration.ofMillis(0L),
+                                    null,
+                                    timestampFormatter,
+                                    fileStore.newScan(),
+                                    fileStore.newCommit(""));
+                    partitionExpire.expire(Long.MAX_VALUE);
+                    InternalRow outputRow = newInternalRow(true);
+                    return new InternalRow[] {outputRow};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<ExpirePartitionsProcedure>() {
+            @Override
+            public ExpirePartitionsProcedure doBuild() {
+                return new ExpirePartitionsProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "ExpirePartitionsProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
new file mode 100644
index 000000000..547e77a6e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+/** IT Case for expire partitions procedure. */
+class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with 
StreamTest {
+
+  import testImplicits._
+
+  test("Paimon Procedure: expire partitions") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (k STRING, pt STRING)
+                       |TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1')
+                       | PARTITIONED BY (pt)
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(String, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("k", "pt")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            // snapshot-1
+            inputData.addData(("a", "2024-06-01"))
+            stream.processAllAvailable()
+
+            // snapshot-2
+            inputData.addData(("b", "9024-06-01"))
+            stream.processAllAvailable()
+
+            checkAnswer(query(), Row("a", "2024-06-01") :: Row("b", 
"9024-06-01") :: Nil)
+            // expire
+            checkAnswer(
+              spark.sql(
+                "CALL paimon.sys.expire_partitions(table => 'test.T', 
expiration_time => '1 d'" +
+                  ", timestamp_formatter => 'yyyy-MM-dd')"),
+              Row(true) :: Nil)
+
+            checkAnswer(query(), Row("b", "9024-06-01") :: Nil)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+}

Reply via email to