This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9592d0391 [spark] Add Spark SQL ALTER table support for set/remove
table properties (#2437)
9592d0391 is described below
commit 9592d0391437de105bae0e760c103ba0c35bc014
Author: Yang Zhang <[email protected]>
AuthorDate: Mon Jan 26 16:05:11 2026 +0800
[spark] Add Spark SQL ALTER table support for set/remove table properties
(#2437)
---
.../org/apache/fluss/spark/SparkCatalog.scala | 14 ++++-
.../org/apache/fluss/spark/SparkConversions.scala | 18 +++++--
.../apache/fluss/spark/FlussSparkTestBase.scala | 3 +-
.../org/apache/fluss/spark/SparkCatalogTest.scala | 59 ++++++++++++++++++++++
4 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index 842ef9b39..cdc2206f2 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -80,7 +80,19 @@ class SparkCatalog extends TableCatalog with
SupportsFlussNamespaces with WithFl
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
- throw new UnsupportedOperationException("Altering table is not supported")
+ try {
+ admin
+ .alterTable(toTablePath(ident),
SparkConversions.toFlussTableChanges(changes).asJava, false)
+ .get()
+ loadTable(ident)
+ } catch {
+ case e: ExecutionException =>
+ if (e.getCause.isInstanceOf[TableNotExistException]) {
+ throw new NoSuchTableException(ident)
+ } else {
+ throw e
+ }
+ }
}
override def dropTable(ident: Identifier): Boolean = {
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
index f85e33f81..eeb726678 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
@@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType
import org.apache.spark.sql.FlussIdentityTransform
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -54,7 +55,7 @@ object SparkConversions {
tableDescriptorBuilder.partitionedBy(partitionKey: _*)
val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) {
- val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",")
+ val pks =
caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim)
schemaBuilder.primaryKey(pks: _*)
pks
} else {
@@ -64,7 +65,7 @@ object SparkConversions {
if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) {
val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt
val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) {
- caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",")
+ caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim)
} else {
primaryKeys.filterNot(partitionKey.contains)
}
@@ -76,7 +77,7 @@ object SparkConversions {
}
val (tableProps, customProps) =
- caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition {
+ caseInsensitiveProps.filterNot(e =>
SPARK_TABLE_OPTIONS.contains(e._1)).partition {
case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX)
}
@@ -97,4 +98,15 @@ object SparkConversions {
}
partitionKeys.toArray
}
+
+ def toFlussTableChanges(changes: Seq[TableChange]):
Seq[org.apache.fluss.metadata.TableChange] = {
+ changes.map {
+ case p: TableChange.SetProperty =>
+ org.apache.fluss.metadata.TableChange.set(p.property(), p.value())
+ case p: TableChange.RemoveProperty =>
+ org.apache.fluss.metadata.TableChange.reset(p.property())
+ // TODO Add full support for table changes
+ case _ => throw new UnsupportedOperationException("Unsupported table
change")
+ }
+ }
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index f9b98fe7c..6dd2c4d86 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.client.table.Table
import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.{ConfigOptions, Configuration}
-import org.apache.fluss.metadata.{TableDescriptor, TablePath}
+import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath}
import org.apache.fluss.row.InternalRow
import org.apache.fluss.server.testutils.FlussClusterExtension
@@ -107,6 +107,7 @@ object FlussSparkTestBase {
.setClusterConf(
new Configuration()
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON)
)
.setNumOfTabletServers(3)
.build
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 4c4bec355..242e7998e 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -17,6 +17,8 @@
package org.apache.fluss.spark
+import org.apache.fluss.config.ConfigOptions
+import org.apache.fluss.exception.InvalidAlterTableException
import org.apache.fluss.metadata._
import org.apache.fluss.types.{DataTypes, RowType}
@@ -24,6 +26,9 @@ import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.connector.catalog.Identifier
import org.assertj.core.api.Assertions.{assertThat, assertThatList}
+import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper}
+
+import java.util.concurrent.ExecutionException
import scala.collection.JavaConverters._
@@ -192,6 +197,60 @@ class SparkCatalogTest extends FlussSparkTestBase {
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
}
+ test("Catalog: set/remove table properties") {
+ withTable("t") {
+ sql(
+ s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}'
= 3)")
+ var flussTable = admin.getTableInfo(createTablePath("t")).get()
+ assertResult(flussTable.getNumBuckets, "check bucket num")(3)
+ assertResult(
+ Map(
+ ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
+ "check table properties")(flussTable.getProperties.toMap.asScala)
+ assert(
+ flussTable.getCustomProperties.toMap.asScala.getOrElse("key1",
"non-exists") == "value1")
+
+ sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' =
'value2')")
+ flussTable = admin.getTableInfo(createTablePath("t")).get()
+ assertResult(
+ Map(
+ ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
+ "check table properties")(flussTable.getProperties.toMap.asScala)
+ assert(
+ flussTable.getCustomProperties.toMap.asScala.getOrElse("key1",
"non-exists") == "value2")
+ assert(
+ flussTable.getCustomProperties.toMap.asScala.getOrElse("key2",
"non-exists") == "value2")
+
+ sql("ALTER TABLE t UNSET TBLPROPERTIES('key1', 'key2')")
+ flussTable = admin.getTableInfo(createTablePath("t")).get()
+ assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1"))
+ assert(!flussTable.getCustomProperties.toMap.asScala.contains("key2"))
+
+ // no error if unset not-exists key
+ sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')")
+
+ sql(
+ s"ALTER TABLE t SET
TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')")
+ flussTable = admin.getTableInfo(createTablePath("t")).get()
+ assertResult(
+ Map(
+ ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon",
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"
+ ),
+ "check table properties"
+ )(flussTable.getProperties.toMap.asScala)
+
+ // Most table properties with prefix of 'table.' are not allowed to be
modified.
+ intercept[ExecutionException] {
+ sql(
+ s"ALTER TABLE t SET
TBLPROPERTIES('${ConfigOptions.TABLE_REPLICATION_FACTOR.key()}' = '2')")
+ }.getCause.shouldBe(a[InvalidAlterTableException])
+ }
+ }
+
test("Partition: show partitions") {
withTable("t") {
sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int)
PARTITIONED BY (pt1, pt2)")