This is an automated email from the ASF dual-hosted git repository.

xuzifu666 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 300cc67c20 [spark] show table extended (#4603)
300cc67c20 is described below

commit 300cc67c208c4b86e2edf58ad1981b86649fe892
Author: Yann Byron <[email protected]>
AuthorDate: Tue Dec 3 11:35:16 2024 +0800

    [spark] show table extended (#4603)
    
    * [spark] show table extended
    
    * update
    
    * [update] doc
---
 docs/content/spark/auxiliary.md                    |  11 ++
 .../paimon/spark/PaimonPartitionManagement.scala   |   2 +-
 .../analysis/PaimonResolvePartitionSpec.scala      |  75 +++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |   2 +
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  19 ++++
 ...logUtils.scala => PaimonCatalogImplicits.scala} |  25 +----
 .../sql/connector/catalog/PaimonCatalogUtils.scala |   3 +
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |   4 +
 .../paimon/spark/sql/DescribeTableTest.scala       |  70 ++++++++++++
 .../catalyst/analysis/Spark3ResolutionRules.scala  |  56 ++++++++++
 .../commands/PaimonShowTablePartitionCommand.scala |  96 ++++++++++++++++
 .../commands/PaimonShowTablesExtendedCommand.scala | 123 +++++++++++++++++++++
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala |   8 +-
 .../catalyst/analysis/Spark4ResolutionRules.scala} |  26 +----
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |   9 +-
 15 files changed, 486 insertions(+), 43 deletions(-)

diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md
index 6330ca27ce..5de0289565 100644
--- a/docs/content/spark/auxiliary.md
+++ b/docs/content/spark/auxiliary.md
@@ -96,6 +96,17 @@ SHOW PARTITIONS my_table;
 SHOW PARTITIONS my_table PARTITION (dt=20230817);
 ```
 
+## Show Table Extended
+The SHOW TABLE EXTENDED statement is used to list table or partition 
information.
+
+```sql
+-- Lists tables that satisfy regular expressions
+SHOW TABLE EXTENDED IN db_name LIKE 'test*';
+
+-- Lists the specified partition information for the table
+SHOW TABLE EXTENDED IN db_name LIKE 'table_name' PARTITION(pt = '2024');
+```
+
 ## Analyze table
 
 The ANALYZE TABLE statement collects statistics about the table, that are to 
be used by the query optimizer to find a better query execution plan.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 9a305ca59a..840f1341a6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -100,7 +100,7 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
   }
 
   override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] 
= {
-    throw new UnsupportedOperationException("Load partition is not supported")
+    Map.empty[String, String].asJava
   }
 
   override def listPartitionIdentifiers(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
new file mode 100644
index 0000000000..5d6a5a063c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.spark.sql.PaimonUtils.{normalizePartitionSpec, 
requireExactMatchedPartitionSpec}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, 
ResolvedPartitionSpec, UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec.conf
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+object PaimonResolvePartitionSpec {
+
+  def resolve(
+      catalog: TableCatalog,
+      tableIndent: Identifier,
+      partitionSpec: PartitionSpec): ResolvedPartitionSpec = {
+    val table = catalog.loadTable(tableIndent).asPartitionable
+    partitionSpec match {
+      case u: UnresolvedPartitionSpec =>
+        val partitionSchema = table.partitionSchema()
+        resolvePartitionSpec(table.name(), u, partitionSchema, 
allowPartitionSpec = false)
+      case o => o.asInstanceOf[ResolvedPartitionSpec]
+    }
+  }
+
+  private def resolvePartitionSpec(
+      tableName: String,
+      partSpec: UnresolvedPartitionSpec,
+      partSchema: StructType,
+      allowPartitionSpec: Boolean): ResolvedPartitionSpec = {
+    val normalizedSpec = normalizePartitionSpec(partSpec.spec, partSchema, 
tableName, conf.resolver)
+    if (!allowPartitionSpec) {
+      requireExactMatchedPartitionSpec(tableName, normalizedSpec, 
partSchema.fieldNames)
+    }
+    val partitionNames = normalizedSpec.keySet
+    val requestedFields = partSchema.filter(field => 
partitionNames.contains(field.name))
+    ResolvedPartitionSpec(
+      requestedFields.map(_.name),
+      convertToPartIdent(normalizedSpec, requestedFields),
+      partSpec.location)
+  }
+
+  def convertToPartIdent(
+      partitionSpec: TablePartitionSpec,
+      schema: Seq[StructField]): InternalRow = {
+    val partValues = schema.map {
+      part =>
+        val raw = partitionSpec.get(part.name).orNull
+        val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType)
+        Cast(Literal.create(raw, StringType), dt, 
Some(conf.sessionLocalTimeZone)).eval()
+    }
+    InternalRow.fromSeq(partValues)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index e8f75d394a..f73df64fb8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -40,6 +40,8 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
     extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
     extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
+    extensions.injectResolutionRule(
+      spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
 
     extensions.injectPostHocResolutionRule(spark => 
PaimonPostHocResolutionRules(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 4492d856ad..cc49e787dc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -20,11 +20,15 @@ package org.apache.spark.sql
 
 import org.apache.spark.executor.OutputMetrics
 import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.PartitioningUtils
 import org.apache.spark.util.{Utils => SparkUtils}
 
 /**
@@ -87,4 +91,19 @@ object PaimonUtils {
     outputMetrics.setBytesWritten(bytesWritten)
     outputMetrics.setRecordsWritten(recordsWritten)
   }
+
+  def normalizePartitionSpec[T](
+      partitionSpec: Map[String, T],
+      partCols: StructType,
+      tblName: String,
+      resolver: Resolver): Map[String, T] = {
+    PartitioningUtils.normalizePartitionSpec(partitionSpec, partCols, tblName, 
resolver)
+  }
+
+  def requireExactMatchedPartitionSpec(
+      tableName: String,
+      spec: TablePartitionSpec,
+      partitionColumnNames: Seq[String]): Unit = {
+    PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, 
partitionColumnNames)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
similarity index 51%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
index 2ab3dc4945..f1f20fb6fb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
@@ -18,26 +18,13 @@
 
 package org.apache.spark.sql.connector.catalog
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.paimon.ReflectUtils
+object PaimonCatalogImplicits {
 
-object PaimonCatalogUtils {
+  import CatalogV2Implicits._
 
-  def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): 
ExternalCatalog = {
-    val externalCatalogClassName =
-      if 
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
-        "org.apache.spark.sql.hive.HiveExternalCatalog"
-      } else {
-        "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
-      }
-    ReflectUtils.reflect[ExternalCatalog, SparkConf, Configuration](
-      externalCatalogClassName,
-      conf,
-      hadoopConf)
-  }
+  implicit class PaimonCatalogHelper(plugin: CatalogPlugin) extends 
CatalogHelper(plugin)
 
+  implicit class PaimonNamespaceHelper(namespace: Array[String]) extends 
NamespaceHelper(namespace)
+
+//  implicit class PaimonTableHelper(table: Table) extends TableHelper(table)
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
index 2ab3dc4945..5db6894ba0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.paimon.ReflectUtils
 
@@ -40,4 +41,6 @@ object PaimonCatalogUtils {
       hadoopConf)
   }
 
+  val TABLE_RESERVED_PROPERTIES: Seq[String] = 
CatalogV2Util.TABLE_RESERVED_PROPERTIES
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index bd85282737..334bd6e931 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -24,6 +24,8 @@ import org.apache.paimon.types.{DataType, RowType}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
@@ -39,6 +41,8 @@ trait SparkShim {
 
   def createSparkParser(delegate: ParserInterface): ParserInterface
 
+  def createCustomResolution(spark: SparkSession): Rule[LogicalPlan]
+
   def createSparkInternalRow(rowType: RowType): SparkInternalRow
 
   def createSparkArrayData(elementType: DataType): SparkArrayData
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
index 528dcd3cd1..ae538fa48c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
@@ -27,6 +27,76 @@ import java.util.Objects
 
 class DescribeTableTest extends PaimonSparkTestBase {
 
+  test("Paimon show: show table extended") {
+    val testDB = "test_show"
+    withDatabase(testDB) {
+      spark.sql("CREATE TABLE s1 (id INT)")
+
+      spark.sql(s"CREATE DATABASE $testDB")
+      spark.sql(s"USE $testDB")
+      spark.sql("CREATE TABLE s2 (id INT, pt STRING) PARTITIONED BY (pt)")
+      spark.sql("CREATE TABLE s3 (id INT, pt1 STRING, pt2 STRING) PARTITIONED 
BY (pt1, pt2)")
+
+      spark.sql("INSERT INTO s2 VALUES (1, '2024'), (2, '2024'), (3, '2025'), 
(4, '2026')")
+      spark.sql("""
+                  |INSERT INTO s3
+                  |VALUES
+                  |(1, '2024', '11'), (2, '2024', '12'), (3, '2025', '11'), 
(4, '2025', '12')
+                  |""".stripMargin)
+
+      // SHOW TABL EXTENDED will give four columns: namespace, tableName, 
isTemporary, information.
+      checkAnswer(
+        sql(s"SHOW TABLE EXTENDED IN $dbName0 LIKE '*'")
+          .select("namespace", "tableName", "isTemporary"),
+        Row("test", "s1", false))
+      checkAnswer(
+        sql(s"SHOW TABLE EXTENDED IN $testDB LIKE '*'")
+          .select("namespace", "tableName", "isTemporary"),
+        Row(testDB, "s2", false) :: Row(testDB, "s3", false) :: Nil
+      )
+
+      // check table s1
+      val res1 = spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 
's2'").select("information")
+      Assertions.assertEquals(1, res1.count())
+      val information1 = res1
+        .collect()
+        .head
+        .getString(0)
+        .split("\n")
+        .map {
+          line =>
+            val kv = line.split(": ", 2)
+            kv(0) -> kv(1)
+        }
+        .toMap
+      Assertions.assertEquals(information1("Catalog"), "paimon")
+      Assertions.assertEquals(information1("Namespace"), testDB)
+      Assertions.assertEquals(information1("Table"), "s2")
+      Assertions.assertEquals(information1("Provider"), "paimon")
+      Assertions.assertEquals(information1("Location"), loadTable(testDB, 
"s2").location().toString)
+
+      // check table s2 partition info
+      val error1 = intercept[Exception] {
+        spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' 
PARTITION(pt='2022')")
+      }.getMessage
+      assert(error1.contains("PARTITIONS_NOT_FOUND"))
+
+      val error2 = intercept[Exception] {
+        spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' 
PARTITION(pt1='2024')")
+      }.getMessage
+      assert(error2.contains("Partition spec is invalid"))
+
+      val res2 =
+        spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1 = 
'2024', pt2 = 11)")
+      checkAnswer(
+        res2.select("namespace", "tableName", "isTemporary"),
+        Row(testDB, "s3", false)
+      )
+      Assertions.assertTrue(
+        
res2.select("information").collect().head.getString(0).contains("Partition 
Values"))
+    }
+  }
+
   test(s"Paimon describe: describe table comment") {
     var comment = "test comment"
     spark.sql(s"""
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
new file mode 100644
index 0000000000..924df2d1e3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.commands.{PaimonShowTablePartitionCommand, 
PaimonShowTablesExtendedCommand}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, 
ResolvedNamespace, UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
ShowTableExtended}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.Identifier
+
+case class Spark3ResolutionRules(session: SparkSession)
+  extends Rule[LogicalPlan]
+  with SQLConfHelper {
+
+  import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+    case ShowTableExtended(
+          ResolvedNamespace(catalog, ns),
+          pattern,
+          partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
+          output) =>
+      partitionSpec
+        .map {
+          spec: PartitionSpec =>
+            val table = Identifier.of(ns.toArray, pattern)
+            val resolvedSpec =
+              PaimonResolvePartitionSpec.resolve(catalog.asTableCatalog, 
table, spec)
+            PaimonShowTablePartitionCommand(output, catalog.asTableCatalog, 
table, resolvedSpec)
+        }
+        .getOrElse {
+          PaimonShowTablesExtendedCommand(catalog.asTableCatalog, ns, pattern, 
output)
+        }
+
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
new file mode 100644
index 0000000000..32f9498585
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
+import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.connector.catalog.{Identifier, 
SupportsPartitionManagement, TableCatalog}
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonShowTablePartitionCommand(
+    override val output: Seq[Attribute],
+    catalog: TableCatalog,
+    tableIndent: Identifier,
+    partSpec: ResolvedPartitionSpec)
+  extends PaimonLeafRunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val rows = new mutable.ArrayBuffer[Row]()
+    val table = catalog.loadTable(tableIndent)
+    val information = getTablePartitionDetails(tableIndent, 
table.asPartitionable, partSpec)
+    rows += Row(tableIndent.namespace.quoted, tableIndent.name(), false, 
s"$information\n")
+
+    rows.toSeq
+  }
+
+  private def getTablePartitionDetails(
+      tableIdent: Identifier,
+      partitionTable: SupportsPartitionManagement,
+      partSpec: ResolvedPartitionSpec): String = {
+    val results = new mutable.LinkedHashMap[String, String]()
+
+    // "Partition Values"
+    val partitionSchema = partitionTable.partitionSchema()
+    val (names, ident) = (partSpec.names, partSpec.ident)
+    val partitionIdentifiers = 
partitionTable.listPartitionIdentifiers(names.toArray, ident)
+    if (partitionIdentifiers.isEmpty) {
+      val part = ident
+        .toSeq(partitionSchema)
+        .zip(partitionSchema.map(_.name))
+        .map(kv => s"${kv._2}" + s" = ${kv._1}")
+        .mkString(", ")
+      throw new RuntimeException(
+        s"""
+           |[PARTITIONS_NOT_FOUND] The partition(s) PARTITION ($part) cannot 
be found in table ${tableIdent.toString}.
+           |Verify the partition specification and table name.
+           |""".stripMargin)
+    }
+    assert(partitionIdentifiers.length == 1)
+    val row = partitionIdentifiers.head
+    val len = partitionSchema.length
+    val partitions = new Array[String](len)
+    val timeZoneId = conf.sessionLocalTimeZone
+    for (i <- 0 until len) {
+      val dataType = partitionSchema(i).dataType
+      val partValueUTF8String =
+        ToPrettyString(Literal(row.get(i, dataType), dataType), 
Some(timeZoneId)).eval(null)
+      val partValueStr = if (partValueUTF8String == null) "null" else 
partValueUTF8String.toString
+      partitions(i) = escapePathName(partitionSchema(i).name) + "=" + 
escapePathName(partValueStr)
+    }
+    val partitionValues = partitions.mkString("[", ", ", "]")
+    results.put("Partition Values", s"$partitionValues")
+
+    // TODO "Partition Parameters", "Created Time", "Last Access", "Partition 
Statistics"
+
+    results
+      .map {
+        case (key, value) =>
+          if (value.isEmpty) key else s"$key: $value"
+      }
+      .mkString("", "\n", "")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
new file mode 100644
index 0000000000..b393982e25
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.util.{QuotingUtils, StringUtils}
+import org.apache.spark.sql.connector.catalog.{Identifier, PaimonCatalogUtils, 
SupportsPartitionManagement, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonShowTablesExtendedCommand(
+    catalog: TableCatalog,
+    namespace: Seq[String],
+    pattern: String,
+    override val output: Seq[Attribute],
+    isExtended: Boolean = false,
+    partitionSpec: Option[TablePartitionSpec] = None)
+  extends PaimonLeafRunnableCommand {
+
+  override def run(spark: SparkSession): Seq[Row] = {
+    val rows = new mutable.ArrayBuffer[Row]()
+
+    val tables = catalog.listTables(namespace.toArray)
+    tables.map {
+      tableIdent: Identifier =>
+        if (StringUtils.filterPattern(Seq(tableIdent.name()), 
pattern).nonEmpty) {
+          val table = catalog.loadTable(tableIdent)
+          val information = getTableDetails(catalog.name, tableIdent, table)
+          rows += Row(tableIdent.namespace().quoted, tableIdent.name(), false, 
s"$information\n")
+        }
+    }
+
+    // TODO: view
+
+    rows.toSeq
+  }
+
+  private def getTableDetails(catalogName: String, identifier: Identifier, 
table: Table): String = {
+    val results = new mutable.LinkedHashMap[String, String]()
+
+    results.put("Catalog", catalogName)
+    results.put("Namespace", identifier.namespace().quoted)
+    results.put("Table", identifier.name())
+    val tableType = if 
(table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) {
+      CatalogTableType.EXTERNAL
+    } else {
+      CatalogTableType.MANAGED
+    }
+    results.put("Type", tableType.name)
+
+    PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES
+      .filterNot(_ == TableCatalog.PROP_EXTERNAL)
+      .foreach(
+        propKey => {
+          if (table.properties.containsKey(propKey)) {
+            results.put(propKey.capitalize, table.properties.get(propKey))
+          }
+        })
+
+    val properties: Seq[String] =
+      conf
+        .redactOptions(table.properties.asScala.toMap)
+        .toList
+        .filter(kv => 
!PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES.contains(kv._1))
+        .sortBy(_._1)
+        .map { case (key, value) => key + "=" + value }
+    if (!table.properties().isEmpty) {
+      results.put("Table Properties", properties.mkString("[", ", ", "]"))
+    }
+
+    // Partition Provider & Partition Columns
+    if (supportsPartitions(table) && 
table.asPartitionable.partitionSchema().nonEmpty) {
+      results.put("Partition Provider", "Catalog")
+      results.put(
+        "Partition Columns",
+        table.asPartitionable
+          .partitionSchema()
+          .map(field => QuotingUtils.quoteIdentifier(field.name))
+          .mkString("[", ", ", "]"))
+    }
+
+    if (table.schema().nonEmpty) {
+      results.put("Schema", table.schema().treeString)
+    }
+
+    results
+      .map {
+        case (key, value) =>
+          if (value.isEmpty) key else s"$key: $value"
+      }
+      .mkString("", "\n", "")
+  }
+
+  private def supportsPartitions(table: Table): Boolean = table match {
+    case _: SupportsPartitionManagement => true
+    case _ => false
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 57d79d6474..f508e2605c 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -18,6 +18,7 @@
 
 package org.apache.spark.sql.paimon.shims
 
+import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
 import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, 
SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
@@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
@@ -38,6 +40,10 @@ class Spark3Shim extends SparkShim {
     new PaimonSpark3SqlExtensionsParser(delegate)
   }
 
+  override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] 
= {
+    Spark3ResolutionRules(spark)
+  }
+
   override def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
     new Spark3InternalRow(rowType)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
similarity index 50%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
copy to 
paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
index 2ab3dc4945..461cbd0c93 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
@@ -16,28 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.catalog
+package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.paimon.ReflectUtils
-
-object PaimonCatalogUtils {
-
-  def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): 
ExternalCatalog = {
-    val externalCatalogClassName =
-      if 
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
-        "org.apache.spark.sql.hive.HiveExternalCatalog"
-      } else {
-        "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
-      }
-    ReflectUtils.reflect[ExternalCatalog, SparkConf, Configuration](
-      externalCatalogClassName,
-      conf,
-      hadoopConf)
-  }
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 
+case class Spark4ResolutionRules(session: SparkSession) extends 
Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan
 }
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index dfec4eb71f..eefddafdbf 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -18,6 +18,7 @@
 
 package org.apache.spark.sql.paimon.shims
 
+import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
 import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
SparkArrayData, SparkInternalRow}
 import org.apache.paimon.types.{DataType, RowType}
@@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.internal.ExpressionUtils
@@ -38,6 +40,11 @@ class Spark4Shim extends SparkShim {
   override def createSparkParser(delegate: ParserInterface): ParserInterface = 
{
     new PaimonSpark4SqlExtensionsParser(delegate)
   }
+
+  override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] 
= {
+    Spark4ResolutionRules(spark)
+  }
+
   override def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
     new Spark4InternalRow(rowType)
   }


Reply via email to