This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f2561b03d3 [spark] Support partition DDL for V1 fallback tables in 
SparkGenericCatalog (#7986)
f2561b03d3 is described below

commit f2561b03d3f5dda3e0b98b9b29d7c5dcd55da7fd
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed May 27 13:31:26 2026 +0800

    [spark] Support partition DDL for V1 fallback tables in SparkGenericCatalog 
(#7986)
    
    When `SparkGenericCatalog` is configured as a named catalog, for
    example:
    
      ```sql
    
    spark.sql.catalog.hive_metastore=org.apache.paimon.spark.SparkGenericCatalog
    
    and a non-Paimon Hive table is created through the fallback session
    catalog:
    
      CREATE EXTERNAL TABLE hive_metastore.default.test_table (
        id INT,
        name STRING
      )
      USING PARQUET
      PARTITIONED BY (dt STRING)
      LOCATION '...';
    
    ALTER TABLE hive_metastore.default.test_table ADD PARTITION ... fails
    with:
    ```
    
      [INVALID_PARTITION_OPERATION.PARTITION_MANAGEMENT_IS_UNSUPPORTED]
      Table ... does not support partition management.
    
      This happens because hive_metastore is resolved as a V2 catalog. Spark 
returns the fallback Hive table as a V1Table, but V2 partition DDL requires the 
loaded table to implement
      SupportsPartitionManagement. Unlike spark_catalog, Spark does not rewrite 
partition commands for named catalogs through the V1 session catalog command 
path.
---
 .../apache/paimon/spark/SparkGenericCatalog.java   |  10 +-
 .../catalog/SparkV1PartitionManagement.scala       | 182 +++++++++++++++++++++
 .../spark/SparkGenericCatalogWithHiveTest.java     |  81 +++++++++
 3 files changed, 272 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index b89eaf0925..bce949c761 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.PaimonCatalogUtils;
+import org.apache.spark.sql.connector.catalog.SparkV1PartitionManagement;
 import org.apache.spark.sql.connector.catalog.StagedTable;
 import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
@@ -170,10 +171,17 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         try {
             return sparkCatalog.loadTable(ident);
         } catch (NoSuchTableException e) {
-            return throwsOldIfExceptionHappens(() -> 
asTableCatalog().loadTable(ident), e);
+            return throwsOldIfExceptionHappens(() -> loadFallbackTable(ident), 
e);
         }
     }
 
+    private Table loadFallbackTable(Identifier ident) throws 
NoSuchTableException {
+        Table table = asTableCatalog().loadTable(ident);
+        return "spark_catalog".equals(catalogName)
+                ? table
+                : SparkV1PartitionManagement.wrap(table, getDelegateCatalog());
+    }
+
     @Override
     public Table loadTable(Identifier ident, String version) throws 
NoSuchTableException {
         try {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
new file mode 100644
index 0000000000..d0a1ed890e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/SparkV1PartitionManagement.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+object SparkV1PartitionManagement {
+
+  def wrap(table: Table, delegate: CatalogPlugin): Table = table match {
+    case v1Table: V1Table
+        if v1Table.catalogTable.partitionColumnNames.nonEmpty &&
+          v1Table.catalogTable.tracksPartitionsInCatalog =>
+      sessionCatalog(delegate)
+        .map(new SparkV1PartitionManagement(v1Table.catalogTable, _))
+        .getOrElse(table)
+
+    case _ => table
+  }
+
+  private def sessionCatalog(delegate: CatalogPlugin): Option[SessionCatalog] 
= delegate match {
+    case v2SessionCatalog: V2SessionCatalog =>
+      // V2SessionCatalog does not expose its SessionCatalog, but fallback V1 
partition DDL must
+      // delegate to SessionCatalog to update Hive metastore partition 
metadata. The private field
+      // name is stable in Spark 3.2, 3.3, 3.4, 3.5 and 4.0. Do not cache 
Field across class
+      // loaders; if reflective access fails, fall back to the original 
V1Table.
+      try {
+        val field = v2SessionCatalog.getClass.getDeclaredField("catalog")
+        field.setAccessible(true)
+        Some(field.get(v2SessionCatalog).asInstanceOf[SessionCatalog])
+      } catch {
+        case NonFatal(_) => None
+      }
+
+    case _ => None
+  }
+}
+
+class SparkV1PartitionManagement(catalogTable: CatalogTable, catalog: 
SessionCatalog)
+  extends V1Table(catalogTable)
+  with SupportsAtomicPartitionManagement {
+
+  override lazy val partitionSchema: StructType = catalogTable.partitionSchema
+
+  override def createPartitions(
+      idents: Array[InternalRow],
+      properties: Array[JMap[String, String]]): Unit = {
+    val partitions = idents.zip(properties).map {
+      case (ident, partitionProperties) =>
+        val scalaProperties = partitionProperties.asScala.toMap
+        val location = scalaProperties.get("location")
+        CatalogTablePartition(
+          toPartitionSpec(ident),
+          catalogTable.storage.copy(locationUri = 
location.map(CatalogUtils.stringToURI)),
+          parameters = scalaProperties - "location")
+    }
+    catalog.createPartitions(catalogTable.identifier, partitions, 
ignoreIfExists = false)
+  }
+
+  override def dropPartitions(idents: Array[InternalRow]): Boolean = {
+    catalog.dropPartitions(
+      catalogTable.identifier,
+      idents.map(toPartitionSpec),
+      ignoreIfNotExists = false,
+      purge = false,
+      retainData = false)
+    true
+  }
+
+  override def purgePartitions(idents: Array[InternalRow]): Boolean = {
+    catalog.dropPartitions(
+      catalogTable.identifier,
+      idents.map(toPartitionSpec),
+      ignoreIfNotExists = false,
+      purge = true,
+      retainData = false)
+    true
+  }
+
+  override def renamePartition(from: InternalRow, to: InternalRow): Boolean = {
+    catalog.renamePartitions(
+      catalogTable.identifier,
+      Seq(toPartitionSpec(from)),
+      Seq(toPartitionSpec(to)))
+    true
+  }
+
+  override def replacePartitionMetadata(
+      ident: InternalRow,
+      properties: JMap[String, String]): Unit = {
+    val partition = catalog.getPartition(catalogTable.identifier, 
toPartitionSpec(ident))
+    val scalaProperties = properties.asScala.toMap
+    val storage = scalaProperties.get("location") match {
+      case Some(location) =>
+        partition.storage.copy(locationUri = 
Some(CatalogUtils.stringToURI(location)))
+      case None => partition.storage
+    }
+    catalog.alterPartitions(
+      catalogTable.identifier,
+      Seq(partition.copy(storage = storage, parameters = scalaProperties - 
"location")))
+  }
+
+  override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] 
= {
+    val partition = catalog.getPartition(catalogTable.identifier, 
toPartitionSpec(ident))
+    (partition.parameters ++
+      partition.storage.locationUri.map(uri => "location" -> 
uri.toString)).asJava
+  }
+
+  override def listPartitionIdentifiers(
+      partitionCols: Array[String],
+      ident: InternalRow): Array[InternalRow] = {
+    val partialSpec =
+      if (partitionCols.isEmpty) {
+        None
+      } else {
+        Some(toPartitionSpec(partitionCols, ident))
+      }
+
+    catalog
+      .listPartitions(catalogTable.identifier, partialSpec)
+      .map(_.toRow(partitionSchema, SQLConf.get.sessionLocalTimeZone))
+      .toArray
+  }
+
+  override def partitionExists(ident: InternalRow): Boolean = {
+    try {
+      catalog.getPartition(catalogTable.identifier, toPartitionSpec(ident))
+      true
+    } catch {
+      case _: NoSuchPartitionException => false
+    }
+  }
+
+  override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
+    throw new UnsupportedOperationException("Truncate partition is not 
supported for V1 tables.")
+  }
+
+  private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
+    toPartitionSpec(partitionSchema.fieldNames, ident)
+  }
+
+  private def toPartitionSpec(
+      partitionCols: Array[String],
+      ident: InternalRow): Map[String, String] = {
+    val schema = StructType(partitionCols.map(partitionSchema.apply))
+    val rowConverter = CatalystTypeConverters.createToScalaConverter(
+      CharVarcharUtils.replaceCharVarcharWithString(schema))
+    val row = rowConverter(ident).asInstanceOf[Row]
+    partitionCols.zipWithIndex.map {
+      case (name, index) =>
+        val value = row.get(index)
+        name -> (if (value == null) null else value.toString)
+    }.toMap
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 9e13dd7be5..67282c84a9 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.TestHiveMetastore;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -48,6 +50,85 @@ public class SparkGenericCatalogWithHiveTest {
         testHiveMetastore.stop();
     }
 
+    @Test
+    public void testAddPartitionForFallbackParquetTableWithNamedCatalog(
+            @TempDir java.nio.file.Path tempDir) throws Exception {
+        Path warehousePath = new Path("file:" + tempDir);
+        String location = 
tempDir.resolve("parquet_part_tbl_path").toFile().getCanonicalPath();
+        SparkSession spark =
+                SparkSession.builder()
+                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
+                        .config("spark.sql.catalogImplementation", "hive")
+                        .config(
+                                "spark.sql.catalog.hive_metastore",
+                                SparkGenericCatalog.class.getName())
+                        .config(
+                                
"spark.sql.catalog.hive_metastore.catalog.create-underlying-session-catalog",
+                                "true")
+                        
.config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
+                        .master("local[2]")
+                        .getOrCreate();
+
+        try {
+            spark.sql("USE hive_metastore");
+            spark.sql("CREATE DATABASE paimon_partition_db");
+            spark.sql("USE paimon_partition_db");
+            spark.sql(
+                    "CREATE EXTERNAL TABLE parquet_part_tbl1 (id INT, name 
STRING) "
+                            + "USING PARQUET PARTITIONED BY (dt STRING) 
LOCATION '"
+                            + location
+                            + "'");
+            spark.sql(
+                    "CREATE EXTERNAL TABLE parquet_part_tbl2 (id INT, name 
STRING) "
+                            + "USING PARQUET PARTITIONED BY (dt STRING) 
LOCATION '"
+                            + location
+                            + "'");
+            spark.sql(
+                    "INSERT INTO parquet_part_tbl1 PARTITION (dt = 
'2026-05-29') "
+                            + "VALUES (1, 'a'), (2, 'b')");
+
+            assertThat(
+                            spark.sql(
+                                            "SELECT id, name, dt FROM 
spark_catalog.paimon_partition_db.parquet_part_tbl2 "
+                                                    + "WHERE dt = 
'2026-05-29'")
+                                    .collectAsList())
+                    .isEmpty();
+
+            spark.sql(
+                    "ALTER TABLE 
hive_metastore.paimon_partition_db.parquet_part_tbl2 "
+                            + "ADD PARTITION (dt = '2026-05-29') LOCATION '"
+                            + location
+                            + "/dt=2026-05-29'");
+            assertThat(
+                            spark
+                                    .sql(
+                                            "SHOW PARTITIONS 
hive_metastore.paimon_partition_db.parquet_part_tbl2")
+                                    .collectAsList().stream()
+                                    .map(Row::toString))
+                    .containsExactly("[dt=2026-05-29]");
+
+            List<Row> rows =
+                    spark.sql(
+                                    "SELECT id, name, dt FROM 
spark_catalog.paimon_partition_db.parquet_part_tbl2 "
+                                            + "WHERE dt = '2026-05-29' ORDER 
BY id")
+                            .collectAsList();
+            assertThat(rows.stream().map(Row::toString))
+                    .containsExactly("[1,a,2026-05-29]", "[2,b,2026-05-29]");
+
+            spark.sql(
+                    "ALTER TABLE 
hive_metastore.paimon_partition_db.parquet_part_tbl2 "
+                            + "DROP PARTITION (dt = '2026-05-29')");
+            assertThat(
+                            spark.sql(
+                                            "SHOW PARTITIONS 
hive_metastore.paimon_partition_db.parquet_part_tbl2")
+                                    .collectAsList())
+                    .isEmpty();
+        } finally {
+            spark.sql("DROP DATABASE IF EXISTS 
spark_catalog.paimon_partition_db CASCADE");
+            spark.stop();
+        }
+    }
+
     @Test
     public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) throws 
IOException {
         // firstly, we use hive metastore to create table, and check the 
result.

Reply via email to