This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2561b03d3 [spark] Support partition DDL for V1 fallback tables in
SparkGenericCatalog (#7986)
f2561b03d3 is described below
commit f2561b03d3f5dda3e0b98b9b29d7c5dcd55da7fd
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed May 27 13:31:26 2026 +0800
[spark] Support partition DDL for V1 fallback tables in SparkGenericCatalog
(#7986)
When `SparkGenericCatalog` is configured as a named catalog, for
example:
```sql
spark.sql.catalog.hive_metastore=org.apache.paimon.spark.SparkGenericCatalog
and a non-Paimon Hive table is created through the fallback session
catalog:
CREATE EXTERNAL TABLE hive_metastore.default.test_table (
id INT,
name STRING
)
USING PARQUET
PARTITIONED BY (dt STRING)
LOCATION '...';
ALTER TABLE hive_metastore.default.test_table ADD PARTITION ... fails
with:
```
[INVALID_PARTITION_OPERATION.PARTITION_MANAGEMENT_IS_UNSUPPORTED]
Table ... does not support partition management.
This happens because hive_metastore is resolved as a V2 catalog. Spark
returns the fallback Hive table as a V1Table, but V2 partition DDL requires the
loaded table to implement
SupportsPartitionManagement. Unlike spark_catalog, Spark does not rewrite
partition commands for named catalogs through the V1 session catalog command
path.
---
.../apache/paimon/spark/SparkGenericCatalog.java | 10 +-
.../catalog/SparkV1PartitionManagement.scala | 182 +++++++++++++++++++++
.../spark/SparkGenericCatalogWithHiveTest.java | 81 +++++++++
3 files changed, 272 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index b89eaf0925..bce949c761 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.PaimonCatalogUtils;
+import org.apache.spark.sql.connector.catalog.SparkV1PartitionManagement;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
@@ -170,10 +171,17 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
try {
return sparkCatalog.loadTable(ident);
} catch (NoSuchTableException e) {
- return throwsOldIfExceptionHappens(() ->
asTableCatalog().loadTable(ident), e);
+ return throwsOldIfExceptionHappens(() -> loadFallbackTable(ident),
e);
}
}
+ private Table loadFallbackTable(Identifier ident) throws
NoSuchTableException {
+ Table table = asTableCatalog().loadTable(ident);
+ return "spark_catalog".equals(catalogName)
+ ? table
+ : SparkV1PartitionManagement.wrap(table, getDelegateCatalog());
+ }
+
@Override
public Table loadTable(Identifier ident, String version) throws
NoSuchTableException {
try {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
new file mode 100644
index 0000000000..d0a1ed890e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+object SparkV1PartitionManagement {
+
+ def wrap(table: Table, delegate: CatalogPlugin): Table = table match {
+ case v1Table: V1Table
+ if v1Table.catalogTable.partitionColumnNames.nonEmpty &&
+ v1Table.catalogTable.tracksPartitionsInCatalog =>
+ sessionCatalog(delegate)
+ .map(new SparkV1PartitionManagement(v1Table.catalogTable, _))
+ .getOrElse(table)
+
+ case _ => table
+ }
+
+ private def sessionCatalog(delegate: CatalogPlugin): Option[SessionCatalog]
= delegate match {
+ case v2SessionCatalog: V2SessionCatalog =>
+ // V2SessionCatalog does not expose its SessionCatalog, but fallback V1
partition DDL must
+ // delegate to SessionCatalog to update Hive metastore partition
metadata. The private field
+ // name is stable in Spark 3.2, 3.3, 3.4, 3.5 and 4.0. Do not cache
Field across class
+ // loaders; if reflective access fails, fall back to the original
V1Table.
+ try {
+ val field = v2SessionCatalog.getClass.getDeclaredField("catalog")
+ field.setAccessible(true)
+ Some(field.get(v2SessionCatalog).asInstanceOf[SessionCatalog])
+ } catch {
+ case NonFatal(_) => None
+ }
+
+ case _ => None
+ }
+}
+
+class SparkV1PartitionManagement(catalogTable: CatalogTable, catalog:
SessionCatalog)
+ extends V1Table(catalogTable)
+ with SupportsAtomicPartitionManagement {
+
+ override lazy val partitionSchema: StructType = catalogTable.partitionSchema
+
+ override def createPartitions(
+ idents: Array[InternalRow],
+ properties: Array[JMap[String, String]]): Unit = {
+ val partitions = idents.zip(properties).map {
+ case (ident, partitionProperties) =>
+ val scalaProperties = partitionProperties.asScala.toMap
+ val location = scalaProperties.get("location")
+ CatalogTablePartition(
+ toPartitionSpec(ident),
+ catalogTable.storage.copy(locationUri =
location.map(CatalogUtils.stringToURI)),
+ parameters = scalaProperties - "location")
+ }
+ catalog.createPartitions(catalogTable.identifier, partitions,
ignoreIfExists = false)
+ }
+
+ override def dropPartitions(idents: Array[InternalRow]): Boolean = {
+ catalog.dropPartitions(
+ catalogTable.identifier,
+ idents.map(toPartitionSpec),
+ ignoreIfNotExists = false,
+ purge = false,
+ retainData = false)
+ true
+ }
+
+ override def purgePartitions(idents: Array[InternalRow]): Boolean = {
+ catalog.dropPartitions(
+ catalogTable.identifier,
+ idents.map(toPartitionSpec),
+ ignoreIfNotExists = false,
+ purge = true,
+ retainData = false)
+ true
+ }
+
+ override def renamePartition(from: InternalRow, to: InternalRow): Boolean = {
+ catalog.renamePartitions(
+ catalogTable.identifier,
+ Seq(toPartitionSpec(from)),
+ Seq(toPartitionSpec(to)))
+ true
+ }
+
+ override def replacePartitionMetadata(
+ ident: InternalRow,
+ properties: JMap[String, String]): Unit = {
+ val partition = catalog.getPartition(catalogTable.identifier,
toPartitionSpec(ident))
+ val scalaProperties = properties.asScala.toMap
+ val storage = scalaProperties.get("location") match {
+ case Some(location) =>
+ partition.storage.copy(locationUri =
Some(CatalogUtils.stringToURI(location)))
+ case None => partition.storage
+ }
+ catalog.alterPartitions(
+ catalogTable.identifier,
+ Seq(partition.copy(storage = storage, parameters = scalaProperties -
"location")))
+ }
+
+ override def loadPartitionMetadata(ident: InternalRow): JMap[String, String]
= {
+ val partition = catalog.getPartition(catalogTable.identifier,
toPartitionSpec(ident))
+ (partition.parameters ++
+ partition.storage.locationUri.map(uri => "location" ->
uri.toString)).asJava
+ }
+
+ override def listPartitionIdentifiers(
+ partitionCols: Array[String],
+ ident: InternalRow): Array[InternalRow] = {
+ val partialSpec =
+ if (partitionCols.isEmpty) {
+ None
+ } else {
+ Some(toPartitionSpec(partitionCols, ident))
+ }
+
+ catalog
+ .listPartitions(catalogTable.identifier, partialSpec)
+ .map(_.toRow(partitionSchema, SQLConf.get.sessionLocalTimeZone))
+ .toArray
+ }
+
+ override def partitionExists(ident: InternalRow): Boolean = {
+ try {
+ catalog.getPartition(catalogTable.identifier, toPartitionSpec(ident))
+ true
+ } catch {
+ case _: NoSuchPartitionException => false
+ }
+ }
+
+ override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
+ throw new UnsupportedOperationException("Truncate partition is not
supported for V1 tables.")
+ }
+
+ private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
+ toPartitionSpec(partitionSchema.fieldNames, ident)
+ }
+
+ private def toPartitionSpec(
+ partitionCols: Array[String],
+ ident: InternalRow): Map[String, String] = {
+ val schema = StructType(partitionCols.map(partitionSchema.apply))
+ val rowConverter = CatalystTypeConverters.createToScalaConverter(
+ CharVarcharUtils.replaceCharVarcharWithString(schema))
+ val row = rowConverter(ident).asInstanceOf[Row]
+ partitionCols.zipWithIndex.map {
+ case (name, index) =>
+ val value = row.get(index)
+ name -> (if (value == null) null else value.toString)
+ }.toMap
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 9e13dd7be5..67282c84a9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -48,6 +50,85 @@ public class SparkGenericCatalogWithHiveTest {
testHiveMetastore.stop();
}
+ @Test
+ public void testAddPartitionForFallbackParquetTableWithNamedCatalog(
+ @TempDir java.nio.file.Path tempDir) throws Exception {
+ Path warehousePath = new Path("file:" + tempDir);
+ String location =
tempDir.resolve("parquet_part_tbl_path").toFile().getCanonicalPath();
+ SparkSession spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ .config("spark.sql.catalogImplementation", "hive")
+ .config(
+ "spark.sql.catalog.hive_metastore",
+ SparkGenericCatalog.class.getName())
+ .config(
+
"spark.sql.catalog.hive_metastore.catalog.create-underlying-session-catalog",
+ "true")
+
.config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
+ .master("local[2]")
+ .getOrCreate();
+
+ try {
+ spark.sql("USE hive_metastore");
+ spark.sql("CREATE DATABASE paimon_partition_db");
+ spark.sql("USE paimon_partition_db");
+ spark.sql(
+ "CREATE EXTERNAL TABLE parquet_part_tbl1 (id INT, name
STRING) "
+ + "USING PARQUET PARTITIONED BY (dt STRING)
LOCATION '"
+ + location
+ + "'");
+ spark.sql(
+ "CREATE EXTERNAL TABLE parquet_part_tbl2 (id INT, name
STRING) "
+ + "USING PARQUET PARTITIONED BY (dt STRING)
LOCATION '"
+ + location
+ + "'");
+ spark.sql(
+ "INSERT INTO parquet_part_tbl1 PARTITION (dt =
'2026-05-29') "
+ + "VALUES (1, 'a'), (2, 'b')");
+
+ assertThat(
+ spark.sql(
+ "SELECT id, name, dt FROM
spark_catalog.paimon_partition_db.parquet_part_tbl2 "
+ + "WHERE dt =
'2026-05-29'")
+ .collectAsList())
+ .isEmpty();
+
+ spark.sql(
+ "ALTER TABLE
hive_metastore.paimon_partition_db.parquet_part_tbl2 "
+ + "ADD PARTITION (dt = '2026-05-29') LOCATION '"
+ + location
+ + "/dt=2026-05-29'");
+ assertThat(
+ spark
+ .sql(
+ "SHOW PARTITIONS
hive_metastore.paimon_partition_db.parquet_part_tbl2")
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactly("[dt=2026-05-29]");
+
+ List<Row> rows =
+ spark.sql(
+ "SELECT id, name, dt FROM
spark_catalog.paimon_partition_db.parquet_part_tbl2 "
+ + "WHERE dt = '2026-05-29' ORDER
BY id")
+ .collectAsList();
+ assertThat(rows.stream().map(Row::toString))
+ .containsExactly("[1,a,2026-05-29]", "[2,b,2026-05-29]");
+
+ spark.sql(
+ "ALTER TABLE
hive_metastore.paimon_partition_db.parquet_part_tbl2 "
+ + "DROP PARTITION (dt = '2026-05-29')");
+ assertThat(
+ spark.sql(
+ "SHOW PARTITIONS
hive_metastore.paimon_partition_db.parquet_part_tbl2")
+ .collectAsList())
+ .isEmpty();
+ } finally {
+ spark.sql("DROP DATABASE IF EXISTS
spark_catalog.paimon_partition_db CASCADE");
+ spark.stop();
+ }
+ }
+
@Test
public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) throws
IOException {
// firstly, we use hive metastore to create table, and check the
result.