This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.5
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/release-0.5 by this push:
new e5f3480c7 [Spark] Spark supports PARTITION MANAGEMENT (#1836)
e5f3480c7 is described below
commit e5f3480c745e94b4117a5cca360a64496c7e3d43
Author: chouc <[email protected]>
AuthorDate: Fri Aug 25 15:26:35 2023 +0800
[Spark] Spark supports PARTITION MANAGEMENT (#1836)
---
.../java/org/apache/paimon/spark/SparkTable.java | 3 +-
.../paimon/spark/PaimonPartitionManagement.scala | 129 ++++++++++++++++
.../spark/sql/PaimonPartitionManagementTest.scala | 169 +++++++++++++++++++++
3 files changed, 300 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 99d6513cb..77a5909a4 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -53,7 +53,8 @@ public class SparkTable
implements org.apache.spark.sql.connector.catalog.Table,
SupportsRead,
SupportsWrite,
- SupportsDelete {
+ SupportsDelete,
+ PaimonPartitionManagement {
private final Table table;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
new file mode 100644
index 000000000..16bd91145
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.operation.FileStoreCommit
+import org.apache.paimon.table.{AbstractFileStoreTable, Table}
+import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.source.TableScan
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.{FileStorePathFactory, RowDataPartitionComputer}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+
+trait PaimonPartitionManagement extends SupportsPartitionManagement {
+ self: SparkTable =>
+
+ def partitionKeys() = getTable.partitionKeys
+
+ def tableRowType() = new RowType(
+ getTable.rowType.getFields.asScala
+ .filter(dataFiled => partitionKeys.contains(dataFiled.name()))
+ .asJava)
+
+ override def partitionSchema(): StructType = {
+ SparkTypeUtils.fromPaimonRowType(tableRowType)
+ }
+
+ override def dropPartition(internalRow: InternalRow): Boolean = {
+ // convert internalRow to row
+ val row: Row = CatalystTypeConverters
+ .createToScalaConverter(partitionSchema())
+ .apply(internalRow)
+ .asInstanceOf[Row]
+ val rowDataPartitionComputer = new RowDataPartitionComputer(
+ FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue,
+ tableRowType,
+ partitionKeys.asScala.toArray)
+ val partitionMap = rowDataPartitionComputer.generatePartValues(new
SparkRow(tableRowType, row))
+ getTable match {
+ case table: AbstractFileStoreTable =>
+ val commit: FileStoreCommit =
table.store.newCommit(UUID.randomUUID.toString)
+ commit.dropPartitions(
+ Collections.singletonList(partitionMap),
+ BatchWriteBuilder.COMMIT_IDENTIFIER)
+ case _ =>
+ throw new UnsupportedOperationException(
+ "Only AbstractFileStoreTable supports drop partitions.")
+ }
+ true
+ }
+
+ override def replacePartitionMetadata(
+ ident: InternalRow,
+ properties: util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException("Replace partition is not
supported")
+ }
+
+ override def loadPartitionMetadata(ident: InternalRow): util.Map[String,
String] = {
+ throw new UnsupportedOperationException("Load partition is not supported")
+ }
+
+ override def listPartitionIdentifiers(
+ partitionCols: Array[String],
+ internalRow: InternalRow): Array[InternalRow] = {
+ assert(
+ partitionCols.length == internalRow.numFields,
+ s"Number of partition names (${partitionCols.length}) must be equal to "
+
+ s"the number of partition values (${internalRow.numFields})."
+ )
+ val schema: StructType = partitionSchema
+ assert(
+ partitionCols.forall(fieldName => schema.fieldNames.contains(fieldName)),
+ s"Some partition names ${partitionCols.mkString("[", ", ", "]")} don't
belong to " +
+ s"the partition schema '${schema.sql}'."
+ )
+
+ val tableScan = getTable.newReadBuilder.newScan
+ val binaryRows = tableScan.listPartitions.asScala.toList
+ binaryRows
+ .map(
+ binaryRow => {
+ val sparkInternalRow: SparkInternalRow =
+ new
SparkInternalRow(SparkTypeUtils.toPaimonType(schema).asInstanceOf[RowType])
+ sparkInternalRow.replace(binaryRow)
+ })
+ .filter(
+ sparkInternalRow => {
+ partitionCols.zipWithIndex
+ .map {
+ case (partitionName, index) =>
+ val internalRowIndex = schema.fieldIndex(partitionName)
+ val structField = schema.fields(index)
+ sparkInternalRow
+ .get(internalRowIndex, structField.dataType)
+ .equals(internalRow.get(index, structField.dataType))
+ }
+ .fold(true)(_ && _)
+ })
+ .toArray
+ }
+
+ override def createPartition(ident: InternalRow, properties:
util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException("Create partition is not
supported")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
new file mode 100644
index 000000000..0b1a33135
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.WriteMode.CHANGE_LOG
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.{AnalysisException, Row}
+
+class PaimonPartitionManagementTest extends PaimonSparkTestBase {
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(s"Partition for non-partitioned table: write-mode: $writeMode,
bucket: $bucket") {
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a,b',"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a VARCHAR(10), b CHAR(10),c BIGINT,dt
VARCHAR(8),hh VARCHAR(4))
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |""".stripMargin)
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1134')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1134')")
+
+ assertThrows[AnalysisException] {
+ spark.sql("alter table T drop partition (dt='20230816',
hh='1134')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("alter table T drop partition (dt='20230816')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("alter table T drop partition (hh='1134')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("show partitions T partition (dt='20230816',
hh='1134')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("show partitions T partition (dt='20230816')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("show partitions T partition (hh='1134')")
+ }
+ }
+ }
+ }
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(s"Partition for partitioned table: write-mode: $writeMode,
bucket: $bucket") {
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a,b,dt,hh',"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a VARCHAR(10), b CHAR(10),c BIGINT,dt
VARCHAR(8),hh VARCHAR(4))
+ |PARTITIONED BY (dt, hh)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',1,'20230816','1134')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1132')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1133')")
+ spark.sql("INSERT INTO T VALUES('a','b',2,'20230817','1134')")
+
+ checkAnswer(
+ spark.sql("show partitions T "),
+ Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133")
+ :: Row("dt=20230816/hh=1134") :: Row("dt=20230817/hh=1132") ::
Row(
+ "dt=20230817/hh=1133") :: Row("dt=20230817/hh=1134") :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (dt='20230817',
hh='1132')"),
+ Row("dt=20230817/hh=1132") :: Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (hh='1132')"),
+ Row("dt=20230816/hh=1132") :: Row("dt=20230817/hh=1132") :: Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (dt='20230816')"),
+ Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133") :: Row(
+ "dt=20230816/hh=1134") :: Nil)
+
+ checkAnswer(spark.sql("show partitions T PARTITION (hh='1135')"),
Nil)
+
+ checkAnswer(spark.sql("show partitions T PARTITION
(dt='20230818')"), Nil)
+
+ spark.sql("alter table T drop partition (dt='20230816',
hh='1134')")
+
+ spark.sql("alter table T drop partition (dt='20230817',
hh='1133')")
+
+ assertThrows[AnalysisException] {
+ spark.sql("alter table T drop partition (dt='20230816')")
+ }
+
+ assertThrows[AnalysisException] {
+ spark.sql("alter table T drop partition (hh='1134')")
+ }
+
+ checkAnswer(
+ spark.sql("show partitions T "),
+ Row("dt=20230816/hh=1132") :: Row("dt=20230816/hh=1133")
+ :: Row("dt=20230817/hh=1132") :: Row("dt=20230817/hh=1134") ::
Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (dt='20230817',
hh='1132')"),
+ Row("dt=20230817/hh=1132") :: Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (hh='1132')"),
+ Row("dt=20230817/hh=1132") :: Row("dt=20230816/hh=1132") :: Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (hh='1134')"),
+ Row("dt=20230817/hh=1134") :: Nil)
+
+ checkAnswer(
+ spark.sql("show partitions T PARTITION (dt='20230817')"),
+ Row("dt=20230817/hh=1132") :: Row("dt=20230817/hh=1134") :: Nil)
+
+ checkAnswer(
+ spark.sql("select * from T"),
+ Row("a", "b", 1L, "20230816", "1132") :: Row("a", "b", 1L,
"20230816", "1133") :: Row(
+ "a",
+ "b",
+ 2L,
+ "20230817",
+ "1132") :: Row("a", "b", 2L, "20230817", "1134") :: Nil
+ )
+ }
+ }
+ }
+}