This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5a5f48622 [Spark] Truncate Partition (#2371)
5a5f48622 is described below
commit 5a5f48622283bbec4fb466edeb32b6b67e8897eb
Author: Yann Byron <[email protected]>
AuthorDate: Thu Nov 23 13:06:56 2023 +0800
[Spark] Truncate Partition (#2371)
---
.../sql/catalyst/analysis/PaimonAnalysis.scala | 16 +++++++++-
.../org/apache/paimon/spark/SparkWriteITCase.java | 34 +++++++++++++++++++---
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
index caca14c73..377539b0a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.PaimonRelation.isPaimonTable
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions,
PaimonTableValueFunction, TruncateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions,
PaimonTableValueFunction, TruncatePartition, TruncateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -50,6 +50,20 @@ case class PaimonPostHocResolutionRules(session:
SparkSession) extends Rule[Logi
case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
PaimonTruncateTableCommand(table, Map.empty)
+ case t @ TruncatePartition(PaimonRelation(table),
ResolvedPartitionSpec(names, ident, _))
+ if t.resolved =>
+ assert(names.length == ident.numFields, "Names and values of partition
don't match")
+ val resolver = session.sessionState.conf.resolver
+ val schema = table.schema()
+ val partitionSpec = names.zipWithIndex.map {
+ case (name, index) =>
+ val field = schema.find(f => resolver(f.name, name)).getOrElse {
+ throw new RuntimeException(s"$name is not a valid partition
column in $schema.")
+ }
+ (name -> ident.get(index, field.dataType).toString)
+ }.toMap
+ PaimonTruncateTableCommand(table, partitionSpec)
+
case _ => plan
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 0e930c82f..99c2a97ee 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -129,14 +129,40 @@ public class SparkWriteITCase {
@Test
public void testTruncateTable() {
spark.sql(
- "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
- + " ('primary-key'='a', 'file.format'='avro')");
- spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22,
'222')").collectAsList();
- spark.sql("TRUNCATE TABLE T").collectAsList();
+ "CREATE TABLE T (a INT, b INT, c STRING)"
+ + " TBLPROPERTIES ('primary-key'='a',
'file.format'='avro')");
+ spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')");
+ spark.sql("TRUNCATE TABLE T");
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
assertThat(rows.toString()).isEqualTo("[]");
}
+ @Test
+ public void testTruncatePartition1() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c LONG) PARTITIONED BY (c)"
+ + " TBLPROPERTIES ('primary-key'='a,c')");
+ spark.sql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
+
+ spark.sql("TRUNCATE TABLE T PARTITION (c = 111)");
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[2,22,222]]");
+ }
+
+ @Test
+ public void testTruncatePartition() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c LONG, d STRING)"
+ + " PARTITIONED BY (c,d)"
+ + " TBLPROPERTIES ('primary-key'='a,c,d')");
+ spark.sql(
+ "INSERT INTO T VALUES (1, 11, 111, 'a'), (2, 22, 222, 'b'),
(3, 33, 333, 'b'), (4, 44, 444, 'a')");
+
+ spark.sql("TRUNCATE TABLE T PARTITION (d = 'a')");
+ List<Row> rows = spark.sql("SELECT * FROM T ORDER BY
a").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[2,22,222,b], [3,33,333,b]]");
+ }
+
@Test
public void testWriteDynamicBucketPartitionedTable() {
spark.sql(