This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b815918 [spark] Use the value from options ​​instead of default 
value for 'partition.default-name' (#3462)
69b815918 is described below

commit 69b815918ef353540b9999255c8138b9d49bc28c
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jun 11 19:21:17 2024 +0800

    [spark] Use the value from options ​​instead of default value for 
'partition.default-name' (#3462)
---
 .../java/org/apache/paimon/spark/SparkTable.java   | 149 ---------------------
 .../paimon/spark/PaimonPartitionManagement.scala   |  75 ++++-------
 .../scala/org/apache/paimon/spark/SparkTable.scala |  97 ++++++++++++++
 .../analysis/PaimonIncompatiblePHRRules.scala      |   2 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |   5 +-
 .../spark/commands/MergeIntoPaimonTable.scala      |   2 +-
 ...bleTest.scala => DeleteFromTableTestBase.scala} |  23 +++-
 .../spark/sql/PaimonPartitionManagementTest.scala  |  20 +++
 8 files changed, 170 insertions(+), 203 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
deleted file mode 100644
index be9f4309a..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.spark.schema.PaimonMetadataColumn$;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
-
-import org.apache.spark.sql.connector.catalog.MetadataColumn;
-import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
-import org.apache.spark.sql.connector.catalog.SupportsRead;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.expressions.FieldReference;
-import org.apache.spark.sql.connector.expressions.IdentityTransform;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
-public class SparkTable
-        implements org.apache.spark.sql.connector.catalog.Table,
-                SupportsRead,
-                SupportsWrite,
-                SupportsMetadataColumns,
-                PaimonPartitionManagement {
-
-    private final Table table;
-
-    public SparkTable(Table table) {
-        this.table = table;
-    }
-
-    public Table getTable() {
-        return table;
-    }
-
-    @Override
-    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
-        Table newTable = table.copy(options.asCaseSensitiveMap());
-        return new PaimonScanBuilder(newTable);
-    }
-
-    @Override
-    public String name() {
-        return table.name();
-    }
-
-    @Override
-    public StructType schema() {
-        return SparkTypeUtils.fromPaimonRowType(table.rowType());
-    }
-
-    @Override
-    public Set<TableCapability> capabilities() {
-        Set<TableCapability> capabilities = new HashSet<>();
-        capabilities.add(TableCapability.BATCH_READ);
-        capabilities.add(TableCapability.V1_BATCH_WRITE);
-        capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
-        capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
-        capabilities.add(TableCapability.MICRO_BATCH_READ);
-        return capabilities;
-    }
-
-    @Override
-    public Transform[] partitioning() {
-        return table.partitionKeys().stream()
-                .map(FieldReference::apply)
-                .map(IdentityTransform::apply)
-                .toArray(Transform[]::new);
-    }
-
-    @Override
-    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
-        try {
-            return new SparkWriteBuilder((FileStoreTable) table, 
Options.fromMap(info.options()));
-        } catch (Exception e) {
-            throw new RuntimeException("Only FileStoreTable can be written.");
-        }
-    }
-
-    @Override
-    public Map<String, String> properties() {
-        if (table instanceof DataTable) {
-            Map<String, String> properties =
-                    new HashMap<>(((DataTable) table).coreOptions().toMap());
-            if (!table.primaryKeys().isEmpty()) {
-                properties.put(
-                        CoreOptions.PRIMARY_KEY.key(), String.join(",", 
table.primaryKeys()));
-            }
-            properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME());
-            if (table.comment().isPresent()) {
-                properties.put(TableCatalog.PROP_COMMENT, 
table.comment().get());
-            }
-            return properties;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        SparkTable that = (SparkTable) o;
-        return Objects.equals(table, that.table);
-    }
-
-    @Override
-    public MetadataColumn[] metadataColumns() {
-        return new MetadataColumn[] {
-            PaimonMetadataColumn$.MODULE$.FILE_PATH(), 
PaimonMetadataColumn$.MODULE$.ROW_INDEX()
-        };
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index a803e1b62..85c85435e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -18,12 +18,11 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.operation.FileStoreCommit
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.BatchWriteBuilder
 import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.InternalRowPartitionComputer
+import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -31,46 +30,39 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.types.StructType
 
-import java.util.{Collections, Map => JMap, UUID}
+import java.util.{Collections, Map => JMap, Objects, UUID}
 
 import scala.collection.JavaConverters._
 
 trait PaimonPartitionManagement extends SupportsPartitionManagement {
   self: SparkTable =>
 
-  def partitionKeys() = getTable.partitionKeys
+  private lazy val partitionRowType: RowType = 
TypeUtils.project(table.rowType, table.partitionKeys)
 
-  def tableRowType() = new RowType(
-    getTable.rowType.getFields.asScala
-      .filter(dataFiled => partitionKeys.contains(dataFiled.name()))
-      .asJava)
-
-  override def partitionSchema(): StructType = {
-    SparkTypeUtils.fromPaimonRowType(tableRowType)
-  }
+  override lazy val partitionSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(partitionRowType)
 
   override def dropPartition(internalRow: InternalRow): Boolean = {
-    // convert internalRow to row
-    val row: Row = CatalystTypeConverters
-      
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema()))
-      .apply(internalRow)
-      .asInstanceOf[Row]
-    val rowDataPartitionComputer = new InternalRowPartitionComputer(
-      CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
-      tableRowType,
-      partitionKeys.asScala.toArray)
-    val partitionMap = rowDataPartitionComputer.generatePartValues(new 
SparkRow(tableRowType, row))
-    getTable match {
-      case table: FileStoreTable =>
-        val commit: FileStoreCommit = 
table.store.newCommit(UUID.randomUUID.toString)
+    table match {
+      case fileStoreTable: FileStoreTable =>
+        // convert internalRow to row
+        val row: Row = CatalystTypeConverters
+          
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
+          .apply(internalRow)
+          .asInstanceOf[Row]
+        val rowDataPartitionComputer = new InternalRowPartitionComputer(
+          fileStoreTable.coreOptions().partitionDefaultName(),
+          partitionRowType,
+          table.partitionKeys().asScala.toArray)
+        val partitionMap =
+          rowDataPartitionComputer.generatePartValues(new 
SparkRow(partitionRowType, row))
+        val commit: FileStoreCommit = 
fileStoreTable.store.newCommit(UUID.randomUUID.toString)
         commit.dropPartitions(
           Collections.singletonList(partitionMap),
           BatchWriteBuilder.COMMIT_IDENTIFIER)
+        true
       case _ =>
-        throw new UnsupportedOperationException(
-          "Only AbstractFileStoreTable supports drop partitions.")
+        throw new UnsupportedOperationException("Only FileStoreTable supports 
drop partitions.")
     }
-    true
   }
 
   override def replacePartitionMetadata(
@@ -91,32 +83,23 @@ trait PaimonPartitionManagement extends 
SupportsPartitionManagement {
       s"Number of partition names (${partitionCols.length}) must be equal to " 
+
         s"the number of partition values (${internalRow.numFields})."
     )
-    val schema: StructType = partitionSchema()
     assert(
-      partitionCols.forall(fieldName => schema.fieldNames.contains(fieldName)),
+      partitionCols.forall(fieldName => 
partitionSchema.fieldNames.contains(fieldName)),
       s"Some partition names ${partitionCols.mkString("[", ", ", "]")} don't 
belong to " +
-        s"the partition schema '${schema.sql}'."
+        s"the partition schema '${partitionSchema.sql}'."
     )
-
-    val tableScan = getTable.newReadBuilder.newScan
-    val binaryRows = tableScan.listPartitions.asScala.toList
-    binaryRows
-      .map(
-        binaryRow => {
-          val sparkInternalRow: SparkInternalRow =
-            new 
SparkInternalRow(SparkTypeUtils.toPaimonType(schema).asInstanceOf[RowType])
-          sparkInternalRow.replace(binaryRow)
-        })
+    table.newReadBuilder.newScan.listPartitions.asScala
+      .map(binaryRow => SparkInternalRow.fromPaimon(binaryRow, 
partitionRowType))
       .filter(
         sparkInternalRow => {
           partitionCols.zipWithIndex
             .map {
               case (partitionName, index) =>
-                val internalRowIndex = schema.fieldIndex(partitionName)
-                val structField = schema.fields(internalRowIndex)
-                sparkInternalRow
-                  .get(internalRowIndex, structField.dataType)
-                  .equals(internalRow.get(index, structField.dataType))
+                val internalRowIndex = 
partitionSchema.fieldIndex(partitionName)
+                val structField = partitionSchema.fields(internalRowIndex)
+                Objects.equals(
+                  sparkInternalRow.get(internalRowIndex, structField.dataType),
+                  internalRow.get(index, structField.dataType))
             }
             .forall(identity)
         })
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
new file mode 100644
index 000000000..29d0cfdb9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
+import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, 
TableCatalog}
+import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map 
=> JMap, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
+case class SparkTable(table: Table)
+  extends org.apache.spark.sql.connector.catalog.Table
+  with SupportsRead
+  with SupportsWrite
+  with SupportsMetadataColumns
+  with PaimonPartitionManagement {
+
+  def getTable: Table = table
+
+  override def name: String = table.name
+
+  override lazy val schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType)
+
+  override def partitioning: Array[Transform] = {
+    table.partitionKeys().asScala.map(p => Expressions.identity(p)).toArray
+  }
+
+  override def properties: JMap[String, String] = {
+    table match {
+      case dataTable: DataTable =>
+        val properties = new JHashMap[String, 
String](dataTable.coreOptions.toMap)
+        if (!table.primaryKeys.isEmpty) {
+          properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", 
table.primaryKeys))
+        }
+        properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
+        if (table.comment.isPresent) {
+          properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
+        }
+        properties
+      case _ => Collections.emptyMap()
+    }
+  }
+
+  override def capabilities: JSet[TableCapability] = {
+    JEnumSet.of(
+      TableCapability.BATCH_READ,
+      TableCapability.V1_BATCH_WRITE,
+      TableCapability.OVERWRITE_BY_FILTER,
+      TableCapability.OVERWRITE_DYNAMIC,
+      TableCapability.MICRO_BATCH_READ
+    )
+  }
+
+  override def metadataColumns: Array[MetadataColumn] = {
+    Array[MetadataColumn](PaimonMetadataColumn.FILE_PATH, 
PaimonMetadataColumn.ROW_INDEX)
+  }
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    table match {
+      case fileStoreTable: FileStoreTable =>
+        new SparkWriteBuilder(fileStoreTable, Options.fromMap(info.options))
+      case _ =>
+        throw new RuntimeException("Only FileStoreTable can be written.")
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
index 422f5e7e5..c5745f575 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
@@ -34,7 +34,7 @@ case class PaimonIncompatiblePHRRules(session: SparkSession) 
extends Rule[Logica
           if t.resolved =>
         assert(names.length == ident.numFields, "Names and values of partition 
don't match")
         val resolver = session.sessionState.conf.resolver
-        val schema = table.schema()
+        val schema = table.schema
         val partitionSpec = names.zipWithIndex.map {
           case (name, index) =>
             val field = schema.find(f => resolver(f.name, name)).getOrElse {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index a2d807ed1..d7333b468 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -76,14 +76,13 @@ case class DeleteFromPaimonTableCommand(
 
       if (
         otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
-          .store()
-          .options()
+          .coreOptions()
           .deleteForceProduceChangelog()
       ) {
         val matchedPartitions =
           
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
         val rowDataPartitionComputer = new InternalRowPartitionComputer(
-          CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
+          table.coreOptions().partitionDefaultName(),
           table.schema().logicalPartitionType(),
           table.partitionKeys.asScala.toArray
         )
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index ae95b7548..a06bc437d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -53,7 +53,7 @@ case class MergeIntoPaimonTable(
 
   override val table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
 
-  lazy val tableSchema: StructType = v2Table.schema()
+  lazy val tableSchema: StructType = v2Table.schema
 
   lazy val filteredTargetPlan: LogicalPlan = {
     val filtersOnlyTarget = getExpressionOnlyRelated(mergeCondition, 
targetTable)
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
similarity index 95%
rename from 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
rename to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 8c879e213..aaaf5867e 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions.MergeEngine
 import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.paimon.spark.catalyst.analysis.Delete
 
 import org.apache.spark.sql.Row
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
@@ -355,6 +354,24 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE 
rowkind='-D'").collectAsList().size())
       .isEqualTo(3)
   }
-}
 
-class DeleteFromTableTest extends DeleteFromTableTestBase {}
+  test("Paimon Delete: delete null partition with specified default partition 
name") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, dt STRING)
+                 |PARTITIONED BY (dt)
+                 |TBLPROPERTIES 
('partition.default-name'='__TEST_DEFAULT_PARTITION__')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, '20240601'), (2, null)")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY a"),
+      Row(1, "20240601") :: Row(2, null) :: Nil
+    )
+
+    spark.sql("DELETE FROM T WHERE dt IS null")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY a"),
+      Row(1, "20240601") :: Nil
+    )
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 5f4c90cfe..bb24b427f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -176,4 +176,24 @@ class PaimonPartitionManagementTest extends 
PaimonSparkTestBase {
           }
       }
   }
+
+  test("Paimon Partition Management: drop null partition with specified 
default partition name") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, dt STRING)
+                 |PARTITIONED BY (dt)
+                 |TBLPROPERTIES 
('partition.default-name'='__TEST_DEFAULT_PARTITION__')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, '20240601'), (2, null)")
+    checkAnswer(
+      spark.sql("SHOW PARTITIONS T"),
+      Row("dt=20240601") :: Row("dt=null") :: Nil
+    )
+
+    spark.sql("ALTER TABLE T DROP PARTITION (dt=null)")
+    checkAnswer(
+      spark.sql("SHOW PARTITIONS T"),
+      Row("dt=20240601") :: Nil
+    )
+  }
 }

Reply via email to