This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a1b82043b [spark] Fix spark desc command output with partition info
(#2313)
a1b82043b is described below
commit a1b82043b17f58a525b746aa7f9ad530c94c73cd
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Jan 11 16:08:48 2026 +0800
[spark] Fix spark desc command output with partition info (#2313)
---
.../fluss/spark/catalog/AbstractSparkTable.scala | 11 +++++--
.../org/apache/spark/sql/CatalogV2UtilShim.scala | 28 +++++++++++++++++
.../org/apache/fluss/spark/FlussCatalogTest.scala | 35 ++++++++++++++++------
.../apache/fluss/spark/FlussSparkTestBase.scala | 11 ++++---
4 files changed, 67 insertions(+), 18 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index b1558dbe9..dc03e94b4 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -17,11 +17,12 @@
package org.apache.fluss.spark.catalog
-import org.apache.fluss.config.{Configuration => FlussConfiguration}
-import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.metadata.TableInfo
import org.apache.fluss.spark.SparkConversions
+import org.apache.spark.sql.CatalogV2UtilShim
import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import java.util
@@ -34,7 +35,7 @@ abstract class AbstractSparkTable(tableInfo: TableInfo)
extends Table {
SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
protected lazy val _partitionSchema = new StructType(
- _schema.fields.filter(tableInfo.getPartitionKeys.contains))
+ _schema.fields.filter(e => tableInfo.getPartitionKeys.contains(e.name)))
override def name(): String = tableInfo.toString
@@ -43,4 +44,8 @@ abstract class AbstractSparkTable(tableInfo: TableInfo)
extends Table {
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.BATCH_WRITE).asJava
}
+
+ override def partitioning(): Array[Transform] = {
+ CatalogV2UtilShim.toSparkTransforms(_partitionSchema.fields.map(_.name))
+ }
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
new file mode 100644
index 000000000..6715e41b9
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/CatalogV2UtilShim.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.expressions.Transform
+
+/** Shim of org.apache.spark.sql.connector.catalog.CatalogV2Util. */
+object CatalogV2UtilShim {
+ def toSparkTransforms(colNames: Seq[String]): Array[Transform] = {
+ colNames.asTransforms
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
index b30fc1e6c..4f2fc877f 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
@@ -21,6 +21,7 @@ import org.apache.fluss.metadata.{DatabaseDescriptor, Schema,
TableDescriptor, T
import org.apache.fluss.types.{DataTypes, RowType}
import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.Identifier
import org.assertj.core.api.Assertions.{assertThat, assertThatList}
import scala.collection.JavaConverters._
@@ -144,12 +145,18 @@ class FlussCatalogTest extends FlussSparkTestBase {
}
test("Catalog: check namespace and table created by admin") {
+ val dbName = "db_by_fluss_admin"
+ val tblName = "tbl_by_fluss_admin"
+ val catalog =
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[SparkCatalog]
+
+ // check namespace
val dbDesc = DatabaseDescriptor.builder().comment("created by
admin").build()
- admin.createDatabase("db_by_admin", dbDesc, true).get()
- checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) ::
Row("db_by_admin") :: Nil)
+ admin.createDatabase(dbName, dbDesc, true).get()
+ assert(catalog.namespaceExists(Array(dbName)))
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(dbName) ::
Nil)
- sql("USE db_by_admin")
- val tablePath = TablePath.of("db_by_admin", "tbl_by_admin")
+ // check table
+ val tablePath = TablePath.of(dbName, tblName)
val rt = RowType
.builder()
.field("id", DataTypes.INT())
@@ -162,15 +169,25 @@ class FlussCatalogTest extends FlussSparkTestBase {
.partitionedBy("pt")
.build()
admin.createTable(tablePath, tableDesc, false).get()
- checkAnswer(sql("SHOW TABLES"), Row("db_by_admin", "tbl_by_admin", false)
:: Nil)
+ assert(
+ catalog.tableExists(Identifier.of(Array(tablePath.getDatabaseName),
tablePath.getTableName)))
+ val expectDescTable = Seq(
+ Row("id", "int", null),
+ Row("name", "string", null),
+ Row("pt", "string", null),
+ Row("# Partition Information", "", ""),
+ Row("# col_name", "data_type", "comment"),
+ Row("pt", "string", null)
+ )
checkAnswer(
- sql("DESC tbl_by_admin"),
- Row("id", "int", null) :: Row("name", "string", null) :: Row("pt",
"string", null) :: Nil)
+ sql(s"DESC $dbName.$tblName"),
+ expectDescTable
+ )
admin.dropTable(tablePath, true).get()
- checkAnswer(sql("SHOW TABLES"), Nil)
+ checkAnswer(sql(s"SHOW TABLES IN $dbName"), Nil)
- admin.dropDatabase("db_by_admin", true, true).get()
+ admin.dropDatabase(dbName, true, true).get()
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
}
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 9f6d4e763..6123b0195 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -30,8 +30,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.junit.jupiter.api.extension.RegisterExtension
-import org.scalactic.source.Position
-import org.scalatest.Tag
import java.time.Duration
@@ -41,16 +39,17 @@ class FlussSparkTestBase extends QueryTest with
SharedSparkSession {
import FlussSparkTestBase._
- protected val DEFAULT_DATABASE = "fluss";
+ protected val DEFAULT_CATALOG = "fluss_catalog"
+ protected val DEFAULT_DATABASE = "fluss"
protected var conn: Connection = _
protected var admin: Admin = _
override protected def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.sql.catalog.fluss_catalog", classOf[SparkCatalog].getName)
- .set("spark.sql.catalog.fluss_catalog.bootstrap.servers",
bootstrapServers)
- .set("spark.sql.defaultCatalog", "fluss_catalog")
+ .set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
+ .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
bootstrapServers)
+ .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
}
override protected def beforeAll(): Unit = {