This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 053b8f746 [spark] Add Spark show/add/drop partition support (#2314)
053b8f746 is described below

commit 053b8f7460f74ddfb0bf344928f3d418ffb46b72
Author: Yang Zhang <[email protected]>
AuthorDate: Sat Jan 17 10:45:35 2026 +0800

    [spark] Add Spark show/add/drop partition support (#2314)
---
 .../org/apache/fluss/spark/SparkCatalog.scala      |  3 +-
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  9 ++-
 .../fluss/spark/catalog/AbstractSparkTable.scala   |  7 +-
 .../catalog/SupportsFlussPartitionManagement.scala | 78 +++++++++++++++++++++-
 ...ussCatalogTest.scala => SparkCatalogTest.scala} | 66 +++++++++++++++++-
 5 files changed, 151 insertions(+), 12 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index c1521ebfe..842ef9b39 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -50,7 +50,8 @@ class SparkCatalog extends TableCatalog with 
SupportsFlussNamespaces with WithFl
   override def loadTable(ident: Identifier): Table = {
     try {
       val tablePath = toTablePath(ident)
-      SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig)
+      val table = new SparkTable(tablePath, 
admin.getTableInfo(tablePath).get(), flussConfig, admin)
+      table
     } catch {
       case e: ExecutionException if 
e.getCause.isInstanceOf[TableNotExistException] =>
         throw new NoSuchTableException(ident)
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 6d31f6fbc..93c51d1e0 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.config.{Configuration => FlussConfiguration}
 import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
@@ -25,8 +26,12 @@ import 
org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBu
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 
-case class SparkTable(tablePath: TablePath, tableInfo: TableInfo, flussConfig: 
FlussConfiguration)
-  extends AbstractSparkTable(tableInfo)
+class SparkTable(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    flussConfig: FlussConfiguration,
+    admin: Admin)
+  extends AbstractSparkTable(admin, tableInfo)
   with SupportsFlussPartitionManagement
   with SupportsWrite {
 
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index dc03e94b4..33f73f600 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -17,7 +17,9 @@
 
 package org.apache.fluss.spark.catalog
 
-import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.SparkConversions
 
 import org.apache.spark.sql.CatalogV2UtilShim
@@ -29,8 +31,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {
-
+abstract class AbstractSparkTable(val admin: Admin, val tableInfo: TableInfo) 
extends Table {
   protected lazy val _schema: StructType =
     SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
 
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
index ed888e5fe..8d9578b98 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
@@ -17,22 +17,33 @@
 
 package org.apache.fluss.spark.catalog
 
+import org.apache.fluss.metadata.PartitionSpec
+
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
SpecificInternalRow}
+import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 trait SupportsFlussPartitionManagement extends AbstractSparkTable with 
SupportsPartitionManagement {
+  import SupportsFlussPartitionManagement._
 
   override def partitionSchema(): StructType = _partitionSchema
 
   override def createPartition(ident: InternalRow, properties: 
util.Map[String, String]): Unit = {
-    throw new UnsupportedOperationException("Creating partition is not 
supported")
+    val partitionSpec = toPartitionSpec(ident, partitionSchema())
+    admin.createPartition(tableInfo.getTablePath, partitionSpec, false).get()
   }
 
   override def dropPartition(ident: InternalRow): Boolean = {
-    throw new UnsupportedOperationException("Dropping partition is not 
supported")
+    val partitionSpec = toPartitionSpec(ident, partitionSchema())
+    admin.dropPartition(tableInfo.getTablePath, partitionSpec, false).get()
+    true
   }
 
   override def replacePartitionMetadata(
@@ -48,6 +59,67 @@ trait SupportsFlussPartitionManagement extends 
AbstractSparkTable with SupportsP
   override def listPartitionIdentifiers(
       names: Array[String],
       ident: InternalRow): Array[InternalRow] = {
-    throw new UnsupportedOperationException("Listing partition is not 
supported")
+    assert(
+      names.length == ident.numFields,
+      s"Number of partition names (${names.length}) must be equal to " +
+        s"the number of partition values (${ident.numFields})."
+    )
+    val schema = partitionSchema()
+    assert(
+      names.forall(fieldName => schema.fieldNames.contains(fieldName)),
+      s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to 
" +
+        s"the partition schema '${schema.sql}'."
+    )
+
+    val flussPartitionRows = admin
+      .listPartitionInfos(tableInfo.getTablePath)
+      .get()
+      .asScala
+      .map(p => toInternalRow(p.getPartitionSpec, schema))
+
+    val indexes = names.map(schema.fieldIndex)
+    val dataTypes = names.map(schema(_).dataType)
+    val currentRow = new GenericInternalRow(new Array[Any](names.length))
+    flussPartitionRows.filter {
+      partRow =>
+        for (i <- names.indices) {
+          currentRow.values(i) = partRow.get(indexes(i), dataTypes(i))
+        }
+        currentRow == ident
+    }.toArray
+  }
+}
+
+object SupportsFlussPartitionManagement {
+  private def toInternalRow(
+      partitionSpec: PartitionSpec,
+      partitionSchema: StructType): InternalRow = {
+    val row = new SpecificInternalRow(partitionSchema)
+    for ((field, i) <- partitionSchema.fields.zipWithIndex) {
+      val partValue = partitionSpec.getSpecMap.get(field.name)
+      val value = field.dataType match {
+        case dt =>
+          // TODO Support more types when needed.
+          PhysicalDataType(field.dataType) match {
+            case PhysicalBooleanType => partValue.toBoolean
+            case PhysicalIntegerType => partValue.toInt
+            case PhysicalDoubleType => partValue.toDouble
+            case PhysicalFloatType => partValue.toFloat
+            case PhysicalLongType => partValue.toLong
+            case PhysicalShortType => partValue.toShort
+            case PhysicalStringType => UTF8String.fromString(partValue)
+          }
+      }
+      row.update(i, value)
+    }
+    row
+  }
+
+  private def toPartitionSpec(row: InternalRow, partitionSchema: StructType): 
PartitionSpec = {
+    val map = partitionSchema.fields.zipWithIndex.map {
+      case (field, idx) =>
+        (field.name, row.get(idx, field.dataType).toString)
+    }.toMap
+    new PartitionSpec(map.asJava)
   }
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
similarity index 72%
rename from 
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
rename to 
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 4f2fc877f..4c4bec355 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -17,16 +17,17 @@
 
 package org.apache.fluss.spark
 
-import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, 
TablePath}
+import org.apache.fluss.metadata._
 import org.apache.fluss.types.{DataTypes, RowType}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.assertj.core.api.Assertions.{assertThat, assertThatList}
 
 import scala.collection.JavaConverters._
 
-class FlussCatalogTest extends FlussSparkTestBase {
+class SparkCatalogTest extends FlussSparkTestBase {
 
   test("Catalog: namespaces") {
     // Always a default database 'fluss'.
@@ -190,4 +191,63 @@ class FlussCatalogTest extends FlussSparkTestBase {
     admin.dropDatabase(dbName, true, true).get()
     checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
   }
+
+  test("Partition: show partitions") {
+    withTable("t") {
+      sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) 
PARTITIONED BY (pt1, pt2)")
+      sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 
'c', 3)")
+
+      var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"), 
Row("pt1=c/pt2=3"))
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+      expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"))
+      checkAnswer(sql(s"SHOW PARTITIONS t PARTITION (pt1 = 'a')"), expect)
+    }
+  }
+
+  test("Partition: add partition") {
+    withTable("t") {
+      sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) 
PARTITIONED BY (pt1, pt2)")
+
+      // add from sparksql
+      sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)")
+      var expect = Seq(Row("pt1=b/pt2=1"))
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+      sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 = 1)")
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+      // add from fluss
+      val map = Map("pt1" -> "b", "pt2" -> "2")
+      admin.createPartition(createTablePath("t"), new 
PartitionSpec(map.asJava), false).get()
+      expect = Seq(Row("pt1=b/pt2=1"), Row("pt1=b/pt2=2"))
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+      intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 
'b', pt2 = 1)"))
+      intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 
'b', pt3 = 1)"))
+      intercept[PartitionsAlreadyExistException](
+        sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)"))
+    }
+  }
+
+  test("Partition: drop partition") {
+    withTable("t") {
+      sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) 
PARTITIONED BY (pt1, pt2)")
+      sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 
'c', 3)")
+
+      // drop from sparksql
+      sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a', pt2 = 2)")
+      var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=c/pt2=3"))
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+      sql(s"ALTER TABLE t DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2)")
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+      // drop from fluss
+      val map = Map("pt1" -> "c", "pt2" -> "3")
+      admin.dropPartition(createTablePath("t"), new PartitionSpec(map.asJava), 
false).get()
+      expect = Seq(Row("pt1=a/pt2=1"))
+      checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+      // spark does not support drop partial partition
+      intercept[AnalysisException](sql(s"ALTER TABLE t DROP PARTITION (pt1 = 
'a')"))
+    }
+  }
 }

Reply via email to