This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69b815918 [spark] Use the value from options instead of default
value for 'partition.default-name' (#3462)
69b815918 is described below
commit 69b815918ef353540b9999255c8138b9d49bc28c
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jun 11 19:21:17 2024 +0800
[spark] Use the value from options instead of default value for
'partition.default-name' (#3462)
---
.../java/org/apache/paimon/spark/SparkTable.java | 149 ---------------------
.../paimon/spark/PaimonPartitionManagement.scala | 75 ++++-------
.../scala/org/apache/paimon/spark/SparkTable.scala | 97 ++++++++++++++
.../analysis/PaimonIncompatiblePHRRules.scala | 2 +-
.../commands/DeleteFromPaimonTableCommand.scala | 5 +-
.../spark/commands/MergeIntoPaimonTable.scala | 2 +-
...bleTest.scala => DeleteFromTableTestBase.scala} | 23 +++-
.../spark/sql/PaimonPartitionManagementTest.scala | 20 +++
8 files changed, 170 insertions(+), 203 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
deleted file mode 100644
index be9f4309a..000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.spark.schema.PaimonMetadataColumn$;
-import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
-
-import org.apache.spark.sql.connector.catalog.MetadataColumn;
-import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
-import org.apache.spark.sql.connector.catalog.SupportsRead;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.expressions.FieldReference;
-import org.apache.spark.sql.connector.expressions.IdentityTransform;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
-public class SparkTable
- implements org.apache.spark.sql.connector.catalog.Table,
- SupportsRead,
- SupportsWrite,
- SupportsMetadataColumns,
- PaimonPartitionManagement {
-
- private final Table table;
-
- public SparkTable(Table table) {
- this.table = table;
- }
-
- public Table getTable() {
- return table;
- }
-
- @Override
- public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
- Table newTable = table.copy(options.asCaseSensitiveMap());
- return new PaimonScanBuilder(newTable);
- }
-
- @Override
- public String name() {
- return table.name();
- }
-
- @Override
- public StructType schema() {
- return SparkTypeUtils.fromPaimonRowType(table.rowType());
- }
-
- @Override
- public Set<TableCapability> capabilities() {
- Set<TableCapability> capabilities = new HashSet<>();
- capabilities.add(TableCapability.BATCH_READ);
- capabilities.add(TableCapability.V1_BATCH_WRITE);
- capabilities.add(TableCapability.OVERWRITE_BY_FILTER);
- capabilities.add(TableCapability.OVERWRITE_DYNAMIC);
- capabilities.add(TableCapability.MICRO_BATCH_READ);
- return capabilities;
- }
-
- @Override
- public Transform[] partitioning() {
- return table.partitionKeys().stream()
- .map(FieldReference::apply)
- .map(IdentityTransform::apply)
- .toArray(Transform[]::new);
- }
-
- @Override
- public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- try {
- return new SparkWriteBuilder((FileStoreTable) table,
Options.fromMap(info.options()));
- } catch (Exception e) {
- throw new RuntimeException("Only FileStoreTable can be written.");
- }
- }
-
- @Override
- public Map<String, String> properties() {
- if (table instanceof DataTable) {
- Map<String, String> properties =
- new HashMap<>(((DataTable) table).coreOptions().toMap());
- if (!table.primaryKeys().isEmpty()) {
- properties.put(
- CoreOptions.PRIMARY_KEY.key(), String.join(",",
table.primaryKeys()));
- }
- properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME());
- if (table.comment().isPresent()) {
- properties.put(TableCatalog.PROP_COMMENT,
table.comment().get());
- }
- return properties;
- } else {
- return Collections.emptyMap();
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SparkTable that = (SparkTable) o;
- return Objects.equals(table, that.table);
- }
-
- @Override
- public MetadataColumn[] metadataColumns() {
- return new MetadataColumn[] {
- PaimonMetadataColumn$.MODULE$.FILE_PATH(),
PaimonMetadataColumn$.MODULE$.ROW_INDEX()
- };
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index a803e1b62..85c85435e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -18,12 +18,11 @@
package org.apache.paimon.spark
-import org.apache.paimon.CoreOptions
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.InternalRowPartitionComputer
+import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -31,46 +30,39 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types.StructType
-import java.util.{Collections, Map => JMap, UUID}
+import java.util.{Collections, Map => JMap, Objects, UUID}
import scala.collection.JavaConverters._
trait PaimonPartitionManagement extends SupportsPartitionManagement {
self: SparkTable =>
- def partitionKeys() = getTable.partitionKeys
+ private lazy val partitionRowType: RowType =
TypeUtils.project(table.rowType, table.partitionKeys)
- def tableRowType() = new RowType(
- getTable.rowType.getFields.asScala
- .filter(dataFiled => partitionKeys.contains(dataFiled.name()))
- .asJava)
-
- override def partitionSchema(): StructType = {
- SparkTypeUtils.fromPaimonRowType(tableRowType)
- }
+ override lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(partitionRowType)
override def dropPartition(internalRow: InternalRow): Boolean = {
- // convert internalRow to row
- val row: Row = CatalystTypeConverters
-
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema()))
- .apply(internalRow)
- .asInstanceOf[Row]
- val rowDataPartitionComputer = new InternalRowPartitionComputer(
- CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
- tableRowType,
- partitionKeys.asScala.toArray)
- val partitionMap = rowDataPartitionComputer.generatePartValues(new
SparkRow(tableRowType, row))
- getTable match {
- case table: FileStoreTable =>
- val commit: FileStoreCommit =
table.store.newCommit(UUID.randomUUID.toString)
+ table match {
+ case fileStoreTable: FileStoreTable =>
+ // convert internalRow to row
+ val row: Row = CatalystTypeConverters
+
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
+ .apply(internalRow)
+ .asInstanceOf[Row]
+ val rowDataPartitionComputer = new InternalRowPartitionComputer(
+ fileStoreTable.coreOptions().partitionDefaultName(),
+ partitionRowType,
+ table.partitionKeys().asScala.toArray)
+ val partitionMap =
+ rowDataPartitionComputer.generatePartValues(new
SparkRow(partitionRowType, row))
+ val commit: FileStoreCommit =
fileStoreTable.store.newCommit(UUID.randomUUID.toString)
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
+ true
case _ =>
- throw new UnsupportedOperationException(
- "Only AbstractFileStoreTable supports drop partitions.")
+ throw new UnsupportedOperationException("Only FileStoreTable supports
drop partitions.")
}
- true
}
override def replacePartitionMetadata(
@@ -91,32 +83,23 @@ trait PaimonPartitionManagement extends
SupportsPartitionManagement {
s"Number of partition names (${partitionCols.length}) must be equal to "
+
s"the number of partition values (${internalRow.numFields})."
)
- val schema: StructType = partitionSchema()
assert(
- partitionCols.forall(fieldName => schema.fieldNames.contains(fieldName)),
+ partitionCols.forall(fieldName =>
partitionSchema.fieldNames.contains(fieldName)),
s"Some partition names ${partitionCols.mkString("[", ", ", "]")} don't
belong to " +
- s"the partition schema '${schema.sql}'."
+ s"the partition schema '${partitionSchema.sql}'."
)
-
- val tableScan = getTable.newReadBuilder.newScan
- val binaryRows = tableScan.listPartitions.asScala.toList
- binaryRows
- .map(
- binaryRow => {
- val sparkInternalRow: SparkInternalRow =
- new
SparkInternalRow(SparkTypeUtils.toPaimonType(schema).asInstanceOf[RowType])
- sparkInternalRow.replace(binaryRow)
- })
+ table.newReadBuilder.newScan.listPartitions.asScala
+ .map(binaryRow => SparkInternalRow.fromPaimon(binaryRow,
partitionRowType))
.filter(
sparkInternalRow => {
partitionCols.zipWithIndex
.map {
case (partitionName, index) =>
- val internalRowIndex = schema.fieldIndex(partitionName)
- val structField = schema.fields(internalRowIndex)
- sparkInternalRow
- .get(internalRowIndex, structField.dataType)
- .equals(internalRow.get(index, structField.dataType))
+ val internalRowIndex =
partitionSchema.fieldIndex(partitionName)
+ val structField = partitionSchema.fields(internalRowIndex)
+ Objects.equals(
+ sparkInternalRow.get(internalRowIndex, structField.dataType),
+ internalRow.get(index, structField.dataType))
}
.forall(identity)
})
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
new file mode 100644
index 000000000..29d0cfdb9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
+import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability,
TableCatalog}
+import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map
=> JMap, Set => JSet}
+
+import scala.collection.JavaConverters._
+
+/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
+case class SparkTable(table: Table)
+ extends org.apache.spark.sql.connector.catalog.Table
+ with SupportsRead
+ with SupportsWrite
+ with SupportsMetadataColumns
+ with PaimonPartitionManagement {
+
+ def getTable: Table = table
+
+ override def name: String = table.name
+
+ override lazy val schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType)
+
+ override def partitioning: Array[Transform] = {
+ table.partitionKeys().asScala.map(p => Expressions.identity(p)).toArray
+ }
+
+ override def properties: JMap[String, String] = {
+ table match {
+ case dataTable: DataTable =>
+ val properties = new JHashMap[String,
String](dataTable.coreOptions.toMap)
+ if (!table.primaryKeys.isEmpty) {
+ properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",",
table.primaryKeys))
+ }
+ properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
+ if (table.comment.isPresent) {
+ properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
+ }
+ properties
+ case _ => Collections.emptyMap()
+ }
+ }
+
+ override def capabilities: JSet[TableCapability] = {
+ JEnumSet.of(
+ TableCapability.BATCH_READ,
+ TableCapability.V1_BATCH_WRITE,
+ TableCapability.OVERWRITE_BY_FILTER,
+ TableCapability.OVERWRITE_DYNAMIC,
+ TableCapability.MICRO_BATCH_READ
+ )
+ }
+
+ override def metadataColumns: Array[MetadataColumn] = {
+ Array[MetadataColumn](PaimonMetadataColumn.FILE_PATH,
PaimonMetadataColumn.ROW_INDEX)
+ }
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ table match {
+ case fileStoreTable: FileStoreTable =>
+ new SparkWriteBuilder(fileStoreTable, Options.fromMap(info.options))
+ case _ =>
+ throw new RuntimeException("Only FileStoreTable can be written.")
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
index 422f5e7e5..c5745f575 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
@@ -34,7 +34,7 @@ case class PaimonIncompatiblePHRRules(session: SparkSession)
extends Rule[Logica
if t.resolved =>
assert(names.length == ident.numFields, "Names and values of partition
don't match")
val resolver = session.sessionState.conf.resolver
- val schema = table.schema()
+ val schema = table.schema
val partitionSpec = names.zipWithIndex.map {
case (name, index) =>
val field = schema.find(f => resolver(f.name, name)).getOrElse {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index a2d807ed1..d7333b468 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -76,14 +76,13 @@ case class DeleteFromPaimonTableCommand(
if (
otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
- .store()
- .options()
+ .coreOptions()
.deleteForceProduceChangelog()
) {
val matchedPartitions =
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
val rowDataPartitionComputer = new InternalRowPartitionComputer(
- CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
+ table.coreOptions().partitionDefaultName(),
table.schema().logicalPartitionType(),
table.partitionKeys.asScala.toArray
)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index ae95b7548..a06bc437d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -53,7 +53,7 @@ case class MergeIntoPaimonTable(
override val table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
- lazy val tableSchema: StructType = v2Table.schema()
+ lazy val tableSchema: StructType = v2Table.schema
lazy val filteredTargetPlan: LogicalPlan = {
val filtersOnlyTarget = getExpressionOnlyRelated(mergeCondition,
targetTable)
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
similarity index 95%
rename from
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
rename to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 8c879e213..aaaf5867e 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.paimon.spark.catalyst.analysis.Delete
import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
@@ -355,6 +354,24 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE
rowkind='-D'").collectAsList().size())
.isEqualTo(3)
}
-}
-class DeleteFromTableTest extends DeleteFromTableTestBase {}
+ test("Paimon Delete: delete null partition with specified default partition
name") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, dt STRING)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES
('partition.default-name'='__TEST_DEFAULT_PARTITION__')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, '20240601'), (2, null)")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "20240601") :: Row(2, null) :: Nil
+ )
+
+ spark.sql("DELETE FROM T WHERE dt IS null")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "20240601") :: Nil
+ )
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 5f4c90cfe..bb24b427f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -176,4 +176,24 @@ class PaimonPartitionManagementTest extends
PaimonSparkTestBase {
}
}
}
+
+ test("Paimon Partition Management: drop null partition with specified
default partition name") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, dt STRING)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES
('partition.default-name'='__TEST_DEFAULT_PARTITION__')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, '20240601'), (2, null)")
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T"),
+ Row("dt=20240601") :: Row("dt=null") :: Nil
+ )
+
+ spark.sql("ALTER TABLE T DROP PARTITION (dt=null)")
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T"),
+ Row("dt=20240601") :: Nil
+ )
+ }
}