This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a5f48622 [Spark] Truncate Partition (#2371)
5a5f48622 is described below

commit 5a5f48622283bbec4fb466edeb32b6b67e8897eb
Author: Yann Byron <[email protected]>
AuthorDate: Thu Nov 23 13:06:56 2023 +0800

    [Spark] Truncate Partition (#2371)
---
 .../sql/catalyst/analysis/PaimonAnalysis.scala     | 16 +++++++++-
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 34 +++++++++++++++++++---
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
index caca14c73..377539b0a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.PaimonRelation.isPaimonTable
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions, 
PaimonTableValueFunction, TruncateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions, 
PaimonTableValueFunction, TruncatePartition, TruncateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
@@ -50,6 +50,20 @@ case class PaimonPostHocResolutionRules(session: 
SparkSession) extends Rule[Logi
       case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
         PaimonTruncateTableCommand(table, Map.empty)
 
+      case t @ TruncatePartition(PaimonRelation(table), 
ResolvedPartitionSpec(names, ident, _))
+          if t.resolved =>
+        assert(names.length == ident.numFields, "Names and values of partition 
don't match")
+        val resolver = session.sessionState.conf.resolver
+        val schema = table.schema()
+        val partitionSpec = names.zipWithIndex.map {
+          case (name, index) =>
+            val field = schema.find(f => resolver(f.name, name)).getOrElse {
+              throw new RuntimeException(s"$name is not a valid partition 
column in $schema.")
+            }
+            (name -> ident.get(index, field.dataType).toString)
+        }.toMap
+        PaimonTruncateTableCommand(table, partitionSpec)
+
       case _ => plan
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 0e930c82f..99c2a97ee 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -129,14 +129,40 @@ public class SparkWriteITCase {
     @Test
     public void testTruncateTable() {
         spark.sql(
-                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
-                        + " ('primary-key'='a', 'file.format'='avro')");
-        spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, 
'222')").collectAsList();
-        spark.sql("TRUNCATE TABLE T").collectAsList();
+                "CREATE TABLE T (a INT, b INT, c STRING)"
+                        + " TBLPROPERTIES ('primary-key'='a', 
'file.format'='avro')");
+        spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')");
+        spark.sql("TRUNCATE TABLE T");
         List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
         assertThat(rows.toString()).isEqualTo("[]");
     }
 
+    @Test
+    public void testTruncatePartition1() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c LONG) PARTITIONED BY (c)"
+                        + " TBLPROPERTIES ('primary-key'='a,c')");
+        spark.sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+
+        spark.sql("TRUNCATE TABLE T PARTITION (c = 111)");
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[2,22,222]]");
+    }
+
+    @Test
+    public void testTruncatePartition() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c LONG, d STRING)"
+                        + " PARTITIONED BY (c,d)"
+                        + " TBLPROPERTIES ('primary-key'='a,c,d')");
+        spark.sql(
+                "INSERT INTO T VALUES (1, 11, 111, 'a'), (2, 22, 222, 'b'), 
(3, 33, 333, 'b'), (4, 44, 444, 'a')");
+
+        spark.sql("TRUNCATE TABLE T PARTITION (d = 'a')");
+        List<Row> rows = spark.sql("SELECT * FROM T ORDER BY 
a").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[2,22,222,b], [3,33,333,b]]");
+    }
+
     @Test
     public void testWriteDynamicBucketPartitionedTable() {
         spark.sql(

Reply via email to