This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 053b8f746 [spark] Add Spark show/add/drop partition support (#2314)
053b8f746 is described below
commit 053b8f7460f74ddfb0bf344928f3d418ffb46b72
Author: Yang Zhang <[email protected]>
AuthorDate: Sat Jan 17 10:45:35 2026 +0800
[spark] Add Spark show/add/drop partition support (#2314)
---
.../org/apache/fluss/spark/SparkCatalog.scala | 3 +-
.../scala/org/apache/fluss/spark/SparkTable.scala | 9 ++-
.../fluss/spark/catalog/AbstractSparkTable.scala | 7 +-
.../catalog/SupportsFlussPartitionManagement.scala | 78 +++++++++++++++++++++-
...ussCatalogTest.scala => SparkCatalogTest.scala} | 66 +++++++++++++++++-
5 files changed, 151 insertions(+), 12 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index c1521ebfe..842ef9b39 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -50,7 +50,8 @@ class SparkCatalog extends TableCatalog with
SupportsFlussNamespaces with WithFl
override def loadTable(ident: Identifier): Table = {
try {
val tablePath = toTablePath(ident)
- SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig)
+ val table = new SparkTable(tablePath,
admin.getTableInfo(tablePath).get(), flussConfig, admin)
+ table
} catch {
case e: ExecutionException if
e.getCause.isInstanceOf[TableNotExistException] =>
throw new NoSuchTableException(ident)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 6d31f6fbc..93c51d1e0 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -17,6 +17,7 @@
package org.apache.fluss.spark
+import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
@@ -25,8 +26,12 @@ import
org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBu
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-case class SparkTable(tablePath: TablePath, tableInfo: TableInfo, flussConfig:
FlussConfiguration)
- extends AbstractSparkTable(tableInfo)
+class SparkTable(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ flussConfig: FlussConfiguration,
+ admin: Admin)
+ extends AbstractSparkTable(admin, tableInfo)
with SupportsFlussPartitionManagement
with SupportsWrite {
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index dc03e94b4..33f73f600 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -17,7 +17,9 @@
package org.apache.fluss.spark.catalog
-import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.SparkConversions
import org.apache.spark.sql.CatalogV2UtilShim
@@ -29,8 +31,7 @@ import java.util
import scala.collection.JavaConverters._
-abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {
-
+abstract class AbstractSparkTable(val admin: Admin, val tableInfo: TableInfo)
extends Table {
protected lazy val _schema: StructType =
SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
index ed888e5fe..8d9578b98 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
@@ -17,22 +17,33 @@
package org.apache.fluss.spark.catalog
+import org.apache.fluss.metadata.PartitionSpec
+
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
SpecificInternalRow}
+import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import java.util
+import scala.collection.JavaConverters._
+
trait SupportsFlussPartitionManagement extends AbstractSparkTable with
SupportsPartitionManagement {
+ import SupportsFlussPartitionManagement._
override def partitionSchema(): StructType = _partitionSchema
override def createPartition(ident: InternalRow, properties:
util.Map[String, String]): Unit = {
- throw new UnsupportedOperationException("Creating partition is not
supported")
+ val partitionSpec = toPartitionSpec(ident, partitionSchema())
+ admin.createPartition(tableInfo.getTablePath, partitionSpec, false).get()
}
override def dropPartition(ident: InternalRow): Boolean = {
- throw new UnsupportedOperationException("Dropping partition is not
supported")
+ val partitionSpec = toPartitionSpec(ident, partitionSchema())
+ admin.dropPartition(tableInfo.getTablePath, partitionSpec, false).get()
+ true
}
override def replacePartitionMetadata(
@@ -48,6 +59,67 @@ trait SupportsFlussPartitionManagement extends
AbstractSparkTable with SupportsP
override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
- throw new UnsupportedOperationException("Listing partition is not
supported")
+ assert(
+ names.length == ident.numFields,
+ s"Number of partition names (${names.length}) must be equal to " +
+ s"the number of partition values (${ident.numFields})."
+ )
+ val schema = partitionSchema()
+ assert(
+ names.forall(fieldName => schema.fieldNames.contains(fieldName)),
+ s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to
" +
+ s"the partition schema '${schema.sql}'."
+ )
+
+ val flussPartitionRows = admin
+ .listPartitionInfos(tableInfo.getTablePath)
+ .get()
+ .asScala
+ .map(p => toInternalRow(p.getPartitionSpec, schema))
+
+ val indexes = names.map(schema.fieldIndex)
+ val dataTypes = names.map(schema(_).dataType)
+ val currentRow = new GenericInternalRow(new Array[Any](names.length))
+ flussPartitionRows.filter {
+ partRow =>
+ for (i <- names.indices) {
+ currentRow.values(i) = partRow.get(indexes(i), dataTypes(i))
+ }
+ currentRow == ident
+ }.toArray
+ }
+}
+
+object SupportsFlussPartitionManagement {
+ private def toInternalRow(
+ partitionSpec: PartitionSpec,
+ partitionSchema: StructType): InternalRow = {
+ val row = new SpecificInternalRow(partitionSchema)
+ for ((field, i) <- partitionSchema.fields.zipWithIndex) {
+ val partValue = partitionSpec.getSpecMap.get(field.name)
+ val value = field.dataType match {
+ case dt =>
+ // TODO Support more types when needed.
+ PhysicalDataType(field.dataType) match {
+ case PhysicalBooleanType => partValue.toBoolean
+ case PhysicalIntegerType => partValue.toInt
+ case PhysicalDoubleType => partValue.toDouble
+ case PhysicalFloatType => partValue.toFloat
+ case PhysicalLongType => partValue.toLong
+ case PhysicalShortType => partValue.toShort
+ case PhysicalStringType => UTF8String.fromString(partValue)
+ }
+ }
+ row.update(i, value)
+ }
+ row
+ }
+
+ private def toPartitionSpec(row: InternalRow, partitionSchema: StructType):
PartitionSpec = {
+ val map = partitionSchema.fields.zipWithIndex.map {
+ case (field, idx) =>
+ (field.name, row.get(idx, field.dataType).toString)
+ }.toMap
+ new PartitionSpec(map.asJava)
}
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
similarity index 72%
rename from
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
rename to
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 4f2fc877f..4c4bec355 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -17,16 +17,17 @@
package org.apache.fluss.spark
-import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor,
TablePath}
+import org.apache.fluss.metadata._
import org.apache.fluss.types.{DataTypes, RowType}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.connector.catalog.Identifier
import org.assertj.core.api.Assertions.{assertThat, assertThatList}
import scala.collection.JavaConverters._
-class FlussCatalogTest extends FlussSparkTestBase {
+class SparkCatalogTest extends FlussSparkTestBase {
test("Catalog: namespaces") {
// Always a default database 'fluss'.
@@ -190,4 +191,63 @@ class FlussCatalogTest extends FlussSparkTestBase {
admin.dropDatabase(dbName, true, true).get()
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
}
+
+ test("Partition: show partitions") {
+ withTable("t") {
+ sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int)
PARTITIONED BY (pt1, pt2)")
+ sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c',
'c', 3)")
+
+ var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"),
Row("pt1=c/pt2=3"))
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+ expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"))
+ checkAnswer(sql(s"SHOW PARTITIONS t PARTITION (pt1 = 'a')"), expect)
+ }
+ }
+
+ test("Partition: add partition") {
+ withTable("t") {
+ sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int)
PARTITIONED BY (pt1, pt2)")
+
+ // add from sparksql
+ sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)")
+ var expect = Seq(Row("pt1=b/pt2=1"))
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+ sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 = 1)")
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+ // add from fluss
+ val map = Map("pt1" -> "b", "pt2" -> "2")
+ admin.createPartition(createTablePath("t"), new
PartitionSpec(map.asJava), false).get()
+ expect = Seq(Row("pt1=b/pt2=1"), Row("pt1=b/pt2=2"))
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+ intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 =
'b', pt2 = 1)"))
+ intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 =
'b', pt3 = 1)"))
+ intercept[PartitionsAlreadyExistException](
+ sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)"))
+ }
+ }
+
+ test("Partition: drop partition") {
+ withTable("t") {
+ sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int)
PARTITIONED BY (pt1, pt2)")
+ sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c',
'c', 3)")
+
+ // drop from sparksql
+ sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a', pt2 = 2)")
+ var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=c/pt2=3"))
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+ sql(s"ALTER TABLE t DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2)")
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+ // drop from fluss
+ val map = Map("pt1" -> "c", "pt2" -> "3")
+ admin.dropPartition(createTablePath("t"), new PartitionSpec(map.asJava),
false).get()
+ expect = Seq(Row("pt1=a/pt2=1"))
+ checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
+
+ // spark does not support drop partial partition
+ intercept[AnalysisException](sql(s"ALTER TABLE t DROP PARTITION (pt1 =
'a')"))
+ }
+ }
}