This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9592d0391 [spark] Add Spark SQL ALTER table support for set/remove 
table properties (#2437)
9592d0391 is described below

commit 9592d0391437de105bae0e760c103ba0c35bc014
Author: Yang Zhang <[email protected]>
AuthorDate: Mon Jan 26 16:05:11 2026 +0800

    [spark] Add Spark SQL ALTER table support for set/remove table properties 
(#2437)
---
 .../org/apache/fluss/spark/SparkCatalog.scala      | 14 ++++-
 .../org/apache/fluss/spark/SparkConversions.scala  | 18 +++++--
 .../apache/fluss/spark/FlussSparkTestBase.scala    |  3 +-
 .../org/apache/fluss/spark/SparkCatalogTest.scala  | 59 ++++++++++++++++++++++
 4 files changed, 89 insertions(+), 5 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index 842ef9b39..cdc2206f2 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -80,7 +80,19 @@ class SparkCatalog extends TableCatalog with 
SupportsFlussNamespaces with WithFl
   }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
-    throw new UnsupportedOperationException("Altering table is not supported")
+    try {
+      admin
+        .alterTable(toTablePath(ident), 
SparkConversions.toFlussTableChanges(changes).asJava, false)
+        .get()
+      loadTable(ident)
+    } catch {
+      case e: ExecutionException =>
+        if (e.getCause.isInstanceOf[TableNotExistException]) {
+          throw new NoSuchTableException(ident)
+        } else {
+          throw e
+        }
+    }
   }
 
   override def dropTable(ident: Identifier): Boolean = {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
index f85e33f81..eeb726678 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
@@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType
 
 import org.apache.spark.sql.FlussIdentityTransform
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 
@@ -54,7 +55,7 @@ object SparkConversions {
     tableDescriptorBuilder.partitionedBy(partitionKey: _*)
 
     val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) {
-      val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",")
+      val pks = 
caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim)
       schemaBuilder.primaryKey(pks: _*)
       pks
     } else {
@@ -64,7 +65,7 @@ object SparkConversions {
     if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) {
       val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt
       val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) {
-        caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",")
+        caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim)
       } else {
         primaryKeys.filterNot(partitionKey.contains)
       }
@@ -76,7 +77,7 @@ object SparkConversions {
     }
 
     val (tableProps, customProps) =
-      caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition {
+      caseInsensitiveProps.filterNot(e => 
SPARK_TABLE_OPTIONS.contains(e._1)).partition {
         case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX)
       }
 
@@ -97,4 +98,15 @@ object SparkConversions {
     }
     partitionKeys.toArray
   }
+
+  def toFlussTableChanges(changes: Seq[TableChange]): 
Seq[org.apache.fluss.metadata.TableChange] = {
+    changes.map {
+      case p: TableChange.SetProperty =>
+        org.apache.fluss.metadata.TableChange.set(p.property(), p.value())
+      case p: TableChange.RemoveProperty =>
+        org.apache.fluss.metadata.TableChange.reset(p.property())
+      // TODO Add full support for table changes
+      case _ => throw new UnsupportedOperationException("Unsupported table 
change")
+    }
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index f9b98fe7c..6dd2c4d86 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.client.table.Table
 import org.apache.fluss.client.table.scanner.log.LogScanner
 import org.apache.fluss.config.{ConfigOptions, Configuration}
-import org.apache.fluss.metadata.{TableDescriptor, TablePath}
+import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath}
 import org.apache.fluss.row.InternalRow
 import org.apache.fluss.server.testutils.FlussClusterExtension
 
@@ -107,6 +107,7 @@ object FlussSparkTestBase {
       .setClusterConf(
         new Configuration()
           .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+          .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON)
       )
       .setNumOfTabletServers(3)
       .build
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 4c4bec355..242e7998e 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.config.ConfigOptions
+import org.apache.fluss.exception.InvalidAlterTableException
 import org.apache.fluss.metadata._
 import org.apache.fluss.types.{DataTypes, RowType}
 
@@ -24,6 +26,9 @@ import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.assertj.core.api.Assertions.{assertThat, assertThatList}
+import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper}
+
+import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
@@ -192,6 +197,60 @@ class SparkCatalogTest extends FlussSparkTestBase {
     checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
   }
 
+  test("Catalog: set/remove table properties") {
+    withTable("t") {
+      sql(
+        s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) 
TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}' 
= 3)")
+      var flussTable = admin.getTableInfo(createTablePath("t")).get()
+      assertResult(flussTable.getNumBuckets, "check bucket num")(3)
+      assertResult(
+        Map(
+          ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
+        "check table properties")(flussTable.getProperties.toMap.asScala)
+      assert(
+        flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", 
"non-exists") == "value1")
+
+      sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' = 
'value2')")
+      flussTable = admin.getTableInfo(createTablePath("t")).get()
+      assertResult(
+        Map(
+          ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
+        "check table properties")(flussTable.getProperties.toMap.asScala)
+      assert(
+        flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", 
"non-exists") == "value2")
+      assert(
+        flussTable.getCustomProperties.toMap.asScala.getOrElse("key2", 
"non-exists") == "value2")
+
+      sql("ALTER TABLE t UNSET TBLPROPERTIES('key1', 'key2')")
+      flussTable = admin.getTableInfo(createTablePath("t")).get()
+      assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1"))
+      assert(!flussTable.getCustomProperties.toMap.asScala.contains("key2"))
+
+      // no error if unset not-exists key
+      sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')")
+
+      sql(
+        s"ALTER TABLE t SET 
TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')")
+      flussTable = admin.getTableInfo(createTablePath("t")).get()
+      assertResult(
+        Map(
+          ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
+          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon",
+          ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"
+        ),
+        "check table properties"
+      )(flussTable.getProperties.toMap.asScala)
+
+      // Most table properties with prefix of 'table.' are not allowed to be 
modified.
+      intercept[ExecutionException] {
+        sql(
+          s"ALTER TABLE t SET 
TBLPROPERTIES('${ConfigOptions.TABLE_REPLICATION_FACTOR.key()}' = '2')")
+      }.getCause.shouldBe(a[InvalidAlterTableException])
+    }
+  }
+
   test("Partition: show partitions") {
     withTable("t") {
       sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) 
PARTITIONED BY (pt1, pt2)")

Reply via email to