This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba53392df [spark] Format table support show partitions (#5798)
8ba53392df is described below

commit 8ba53392df5fb473667e1e910b6a5e79fb565f7a
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jun 26 22:06:41 2025 +0800

    [spark] Format table support show partitions (#5798)
---
 .../spark/sql/execution/PaimonFormatTable.scala    | 79 ++++++++++++++++------
 .../paimon/spark/sql/FormatTableTestBase.scala     | 20 ++++++
 2 files changed, 80 insertions(+), 19 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 842009b4a5..8e6eea0127 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -29,6 +32,8 @@ import 
org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFile
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
+import java.util
+
 import scala.collection.JavaConverters._
 
 object PaimonFormatTable {
@@ -108,6 +113,45 @@ object PaimonFormatTable {
 }
 
 // Paimon Format Table
+trait PartitionedFormatTable extends SupportsPartitionManagement {
+
+  val partitionSchema_ : StructType
+
+  val fileIndex: PartitioningAwareFileIndex
+
+  override def partitionSchema(): StructType = partitionSchema_
+
+  override def listPartitionIdentifiers(
+      names: Array[String],
+      ident: InternalRow): Array[InternalRow] = {
+    val partitionFilters = names.zipWithIndex.map {
+      case (name, index) =>
+        val f = partitionSchema().apply(name)
+        EqualTo(
+          AttributeReference(f.name, f.dataType, f.nullable)(),
+          Literal(ident.get(index, f.dataType), f.dataType))
+    }.toSeq
+    fileIndex.listFiles(partitionFilters, Seq.empty).map(_.values).toArray
+  }
+
+  override def createPartition(ident: InternalRow, properties: 
util.Map[String, String]): Unit = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def dropPartition(ident: InternalRow): Boolean = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def replacePartitionMetadata(
+      ident: InternalRow,
+      properties: util.Map[String, String]): Unit = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def loadPartitionMetadata(ident: InternalRow): util.Map[String, 
String] = {
+    Map.empty[String, String].asJava
+  }
+}
 
 class PartitionedCSVTable(
     name: String,
@@ -116,16 +160,16 @@ class PartitionedCSVTable(
     paths: Seq[String],
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
-    partitionSchema: StructType
-) extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
-
+    override val partitionSchema_ : StructType)
+  extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
+  with PartitionedFormatTable {
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     PaimonFormatTable.createFileIndex(
       options,
       sparkSession,
       paths,
       userSpecifiedSchema,
-      partitionSchema)
+      partitionSchema())
   }
 }
 
@@ -136,8 +180,9 @@ class PartitionedOrcTable(
     paths: Seq[String],
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
-    partitionSchema: StructType
-) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
+    override val partitionSchema_ : StructType
+) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
+  with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     PaimonFormatTable.createFileIndex(
@@ -145,7 +190,7 @@ class PartitionedOrcTable(
       sparkSession,
       paths,
       userSpecifiedSchema,
-      partitionSchema)
+      partitionSchema())
   }
 }
 
@@ -156,14 +201,9 @@ class PartitionedParquetTable(
     paths: Seq[String],
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
-    partitionSchema: StructType
-) extends ParquetTable(
-    name,
-    sparkSession,
-    options,
-    paths,
-    userSpecifiedSchema,
-    fallbackFileFormat) {
+    override val partitionSchema_ : StructType
+) extends ParquetTable(name, sparkSession, options, paths, 
userSpecifiedSchema, fallbackFileFormat)
+  with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     PaimonFormatTable.createFileIndex(
@@ -171,7 +211,7 @@ class PartitionedParquetTable(
       sparkSession,
       paths,
       userSpecifiedSchema,
-      partitionSchema)
+      partitionSchema())
   }
 }
 
@@ -182,8 +222,9 @@ class PartitionedJsonTable(
     paths: Seq[String],
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
-    partitionSchema: StructType)
-  extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
+    override val partitionSchema_ : StructType)
+  extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
+  with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     PaimonFormatTable.createFileIndex(
@@ -191,6 +232,6 @@ class PartitionedJsonTable(
       sparkSession,
       paths,
       userSpecifiedSchema,
-      partitionSchema)
+      partitionSchema())
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 56c14aa64b..8c5b96a7d6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -58,6 +58,26 @@ abstract class FormatTableTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Format table: show partitions") {
+    for (format <- Seq("csv", "orc", "parquet", "json")) {
+      withTable("t") {
+        sql(s"CREATE TABLE t (id INT, p1 INT, p2 STRING) USING $format 
PARTITIONED BY (p1, p2)")
+        sql("INSERT INTO t VALUES (1, 1, '1')")
+        sql("INSERT INTO t VALUES (2, 1, '1')")
+        sql("INSERT INTO t VALUES (3, 2, '1')")
+        sql("INSERT INTO t VALUES (3, 2, '2')")
+
+        checkAnswer(
+          spark.sql("SHOW PARTITIONS T"),
+          Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+
+        checkAnswer(spark.sql("SHOW PARTITIONS T PARTITION (p1=1)"), 
Seq(Row("p1=1/p2=1")))
+
+        checkAnswer(spark.sql("SHOW PARTITIONS T PARTITION (p1=2, p2='2')"), 
Seq(Row("p1=2/p2=2")))
+      }
+    }
+  }
+
   test("Format table: CTAS with partitioned table") {
     withTable("t1", "t2") {
       sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY 
(p1, p2)")

Reply via email to