This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a1b82043b [spark] Fix spark desc command output with partition info 
(#2313)
a1b82043b is described below

commit a1b82043b17f58a525b746aa7f9ad530c94c73cd
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Jan 11 16:08:48 2026 +0800

    [spark] Fix spark desc command output with partition info (#2313)
---
 .../fluss/spark/catalog/AbstractSparkTable.scala   | 11 +++++--
 .../org/apache/spark/sql/CatalogV2UtilShim.scala   | 28 +++++++++++++++++
 .../org/apache/fluss/spark/FlussCatalogTest.scala  | 35 ++++++++++++++++------
 .../apache/fluss/spark/FlussSparkTestBase.scala    | 11 ++++---
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index b1558dbe9..dc03e94b4 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -17,11 +17,12 @@
 
 package org.apache.fluss.spark.catalog
 
-import org.apache.fluss.config.{Configuration => FlussConfiguration}
-import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.metadata.TableInfo
 import org.apache.fluss.spark.SparkConversions
 
+import org.apache.spark.sql.CatalogV2UtilShim
 import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 
 import java.util
@@ -34,7 +35,7 @@ abstract class AbstractSparkTable(tableInfo: TableInfo) 
extends Table {
     SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
 
   protected lazy val _partitionSchema = new StructType(
-    _schema.fields.filter(tableInfo.getPartitionKeys.contains))
+    _schema.fields.filter(e => tableInfo.getPartitionKeys.contains(e.name)))
 
   override def name(): String = tableInfo.toString
 
@@ -43,4 +44,8 @@ abstract class AbstractSparkTable(tableInfo: TableInfo) 
extends Table {
   override def capabilities(): util.Set[TableCapability] = {
     Set(TableCapability.BATCH_WRITE).asJava
   }
+
+  override def partitioning(): Array[Transform] = {
+    CatalogV2UtilShim.toSparkTransforms(_partitionSchema.fields.map(_.name))
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
new file mode 100644
index 000000000..6715e41b9
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.expressions.Transform
+
+/** Shim of org.apache.spark.sql.connector.catalog.CatalogV2Util. */
+object CatalogV2UtilShim {
+  def toSparkTransforms(colNames: Seq[String]): Array[Transform] = {
+    colNames.asTransforms
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
index b30fc1e6c..4f2fc877f 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
@@ -21,6 +21,7 @@ import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, 
TableDescriptor, T
 import org.apache.fluss.types.{DataTypes, RowType}
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.assertj.core.api.Assertions.{assertThat, assertThatList}
 
 import scala.collection.JavaConverters._
@@ -144,12 +145,18 @@ class FlussCatalogTest extends FlussSparkTestBase {
   }
 
   test("Catalog: check namespace and table created by admin") {
+    val dbName = "db_by_fluss_admin"
+    val tblName = "tbl_by_fluss_admin"
+    val catalog = 
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[SparkCatalog]
+
+    // check namespace
     val dbDesc = DatabaseDescriptor.builder().comment("created by 
admin").build()
-    admin.createDatabase("db_by_admin", dbDesc, true).get()
-    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: 
Row("db_by_admin") :: Nil)
+    admin.createDatabase(dbName, dbDesc, true).get()
+    assert(catalog.namespaceExists(Array(dbName)))
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(dbName) :: 
Nil)
 
-    sql("USE db_by_admin")
-    val tablePath = TablePath.of("db_by_admin", "tbl_by_admin")
+    // check table
+    val tablePath = TablePath.of(dbName, tblName)
     val rt = RowType
       .builder()
       .field("id", DataTypes.INT())
@@ -162,15 +169,25 @@ class FlussCatalogTest extends FlussSparkTestBase {
       .partitionedBy("pt")
       .build()
     admin.createTable(tablePath, tableDesc, false).get()
-    checkAnswer(sql("SHOW TABLES"), Row("db_by_admin", "tbl_by_admin", false) 
:: Nil)
+    assert(
+      catalog.tableExists(Identifier.of(Array(tablePath.getDatabaseName), 
tablePath.getTableName)))
+    val expectDescTable = Seq(
+      Row("id", "int", null),
+      Row("name", "string", null),
+      Row("pt", "string", null),
+      Row("# Partition Information", "", ""),
+      Row("# col_name", "data_type", "comment"),
+      Row("pt", "string", null)
+    )
     checkAnswer(
-      sql("DESC tbl_by_admin"),
-      Row("id", "int", null) :: Row("name", "string", null) :: Row("pt", 
"string", null) :: Nil)
+      sql(s"DESC $dbName.$tblName"),
+      expectDescTable
+    )
 
     admin.dropTable(tablePath, true).get()
-    checkAnswer(sql("SHOW TABLES"), Nil)
+    checkAnswer(sql(s"SHOW TABLES IN $dbName"), Nil)
 
-    admin.dropDatabase("db_by_admin", true, true).get()
+    admin.dropDatabase(dbName, true, true).get()
     checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
   }
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 9f6d4e763..6123b0195 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -30,8 +30,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
 import org.junit.jupiter.api.extension.RegisterExtension
-import org.scalactic.source.Position
-import org.scalatest.Tag
 
 import java.time.Duration
 
@@ -41,16 +39,17 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
 
   import FlussSparkTestBase._
 
-  protected val DEFAULT_DATABASE = "fluss";
+  protected val DEFAULT_CATALOG = "fluss_catalog"
+  protected val DEFAULT_DATABASE = "fluss"
 
   protected var conn: Connection = _
   protected var admin: Admin = _
 
   override protected def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.sql.catalog.fluss_catalog", classOf[SparkCatalog].getName)
-      .set("spark.sql.catalog.fluss_catalog.bootstrap.servers", 
bootstrapServers)
-      .set("spark.sql.defaultCatalog", "fluss_catalog")
+      .set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
+      .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", 
bootstrapServers)
+      .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
   }
 
   override protected def beforeAll(): Unit = {

Reply via email to