This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 087a05bbb [spark] Add sparksql lake table catalog DDL suite for paimon 
(#2438)
087a05bbb is described below

commit 087a05bbb4c0ecb43b7b6e6c4cea16500721999e
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Jan 28 10:59:33 2026 +0800

    [spark] Add sparksql lake table catalog DDL suite for paimon (#2438)
---
 .../org/apache/fluss/spark/SparkCatalog.scala      |   2 +-
 fluss-spark/fluss-spark-ut/pom.xml                 |  14 +
 ...apache.fluss.lake.lakestorage.LakeStoragePlugin |  19 +
 .../apache/fluss/spark/FlussSparkTestBase.scala    |  64 ++--
 .../org/apache/fluss/spark/SparkCatalogTest.scala  |  37 +-
 .../lake/paimon/SparkLakePaimonCatalogTest.scala   | 415 +++++++++++++++++++++
 6 files changed, 498 insertions(+), 53 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index cdc2206f2..045063e4f 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -74,7 +74,7 @@ class SparkCatalog extends TableCatalog with 
SupportsFlussNamespaces with WithFl
         } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) {
           throw new TableAlreadyExistsException(ident)
         } else {
-          throw new RuntimeException(e)
+          throw e
         }
     }
   }
diff --git a/fluss-spark/fluss-spark-ut/pom.xml 
b/fluss-spark/fluss-spark-ut/pom.xml
index ffa424d26..96d1ccb45 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -81,6 +81,20 @@
             <artifactId>fluss-test-utils</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-lake-paimon</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+            <version>${paimon.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- for curator TestingServer  -->
         <dependency>
             <groupId>org.apache.curator</groupId>
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
 
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
new file mode 100644
index 000000000..69bf0f8a4
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 6dd2c4d86..4bdf5e84d 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -26,10 +26,8 @@ import org.apache.fluss.metadata.{DataLakeFormat, 
TableDescriptor, TablePath}
 import org.apache.fluss.row.InternalRow
 import org.apache.fluss.server.testutils.FlussClusterExtension
 
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
-import org.junit.jupiter.api.extension.RegisterExtension
 
 import java.time.Duration
 
@@ -37,32 +35,49 @@ import scala.collection.JavaConverters._
 
 class FlussSparkTestBase extends QueryTest with SharedSparkSession {
 
-  import FlussSparkTestBase._
-
   protected val DEFAULT_CATALOG = "fluss_catalog"
   protected val DEFAULT_DATABASE = "fluss"
 
   protected var conn: Connection = _
   protected var admin: Admin = _
 
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
-      .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", 
bootstrapServers)
-      .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
-      // Enable read optimized by default temporarily.
-      // TODO: remove this when https://github.com/apache/fluss/issues/2427 is 
done.
-      .set("spark.sql.fluss.readOptimized", "true")
-  }
+  val flussServer: FlussClusterExtension =
+    FlussClusterExtension.builder
+      .setClusterConf(flussConf)
+      .setNumOfTabletServers(3)
+      .build
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
-    conn = ConnectionFactory.createConnection(clientConf)
+    flussServer.start()
+    conn = ConnectionFactory.createConnection(flussServer.getClientConfig)
     admin = conn.getAdmin
 
+    spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
+    spark.conf.set(
+      s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
+      flussServer.getBootstrapServers)
+    spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+    // Enable read optimized by default temporarily.
+    // TODO: remove this when https://github.com/apache/fluss/issues/2427 is 
done.
+    spark.conf.set("spark.sql.fluss.readOptimized", "true")
+
     sql(s"USE $DEFAULT_DATABASE")
   }
 
+  override protected def afterAll(): Unit = {
+    super.afterAll()
+    if (admin != null) {
+      admin.close()
+      admin = null
+    }
+    if (conn != null) {
+      conn.close()
+      conn = null
+    }
+    flussServer.close()
+  }
+
   def createTablePath(tableName: String): TablePath = {
     TablePath.of(DEFAULT_DATABASE, tableName)
   }
@@ -98,22 +113,9 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
       .map(record => (record.getChangeType.shortString(), record.getRow))
       .toArray
   }
-}
-
-@RegisterExtension
-object FlussSparkTestBase {
-  val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension =
-    FlussClusterExtension.builder
-      .setClusterConf(
-        new Configuration()
-          .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
-          .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON)
-      )
-      .setNumOfTabletServers(3)
-      .build
 
-  FLUSS_CLUSTER_EXTENSION.start()
-
-  val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig
-  val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers
+  protected def flussConf: Configuration = {
+    val conf = new Configuration
+    conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 242e7998e..e30b03baa 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -34,6 +34,8 @@ import scala.collection.JavaConverters._
 
 class SparkCatalogTest extends FlussSparkTestBase {
 
+  protected def lakeFormat: Option[DataLakeFormat] = None
+
   test("Catalog: namespaces") {
     // Always a default database 'fluss'.
     checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
@@ -197,27 +199,28 @@ class SparkCatalogTest extends FlussSparkTestBase {
     checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
   }
 
-  test("Catalog: set/remove table properties") {
+  protected def modifyTablePropertiesWithCheck(): Unit = {
     withTable("t") {
       sql(
         s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) 
TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}' 
= 3)")
       var flussTable = admin.getTableInfo(createTablePath("t")).get()
       assertResult(flussTable.getNumBuckets, "check bucket num")(3)
-      assertResult(
+      val expectTableProperties = if (lakeFormat.isDefined) {
         Map(
           ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
-          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
-        "check table properties")(flussTable.getProperties.toMap.asScala)
+          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> lakeFormat.get.toString)
+      } else {
+        Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1")
+      }
+      assertResult(expectTableProperties, "check table properties")(
+        flussTable.getProperties.toMap.asScala)
       assert(
         flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", 
"non-exists") == "value1")
 
       sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' = 
'value2')")
       flussTable = admin.getTableInfo(createTablePath("t")).get()
-      assertResult(
-        Map(
-          ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
-          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
-        "check table properties")(flussTable.getProperties.toMap.asScala)
+      assertResult(expectTableProperties, "check table properties")(
+        flussTable.getProperties.toMap.asScala)
       assert(
         flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", 
"non-exists") == "value2")
       assert(
@@ -231,18 +234,6 @@ class SparkCatalogTest extends FlussSparkTestBase {
       // no error if unset not-exists key
       sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')")
 
-      sql(
-        s"ALTER TABLE t SET 
TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')")
-      flussTable = admin.getTableInfo(createTablePath("t")).get()
-      assertResult(
-        Map(
-          ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
-          ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon",
-          ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"
-        ),
-        "check table properties"
-      )(flussTable.getProperties.toMap.asScala)
-
       // Most table properties with prefix of 'table.' are not allowed to be 
modified.
       intercept[ExecutionException] {
         sql(
@@ -251,6 +242,10 @@ class SparkCatalogTest extends FlussSparkTestBase {
     }
   }
 
+  test("Catalog: set/remove table properties") {
+    modifyTablePropertiesWithCheck()
+  }
+
   test("Partition: show partitions") {
     withTable("t") {
       sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) 
PARTITIONED BY (pt1, pt2)")
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
new file mode 100644
index 000000000..d5a29e163
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake.paimon
+
+import org.apache.fluss.config.{ConfigOptions, Configuration, FlussConfigUtils}
+import org.apache.fluss.exception.FlussRuntimeException
+import org.apache.fluss.lake.paimon.utils.PaimonConversions
+import org.apache.fluss.metadata._
+import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME, 
OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}
+import org.apache.fluss.server.utils.LakeStorageUtils
+import org.apache.fluss.spark.SparkCatalogTest
+import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY, 
BUCKET_NUMBER, PRIMARY_KEY}
+
+import org.scalatest.matchers.must.Matchers.contain
+import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper}
+
+import java.nio.file.Files
+
+import scala.collection.JavaConverters._
+
+class SparkLakePaimonCatalogTest extends SparkCatalogTest {
+
+  private var paimonCatalog: org.apache.paimon.catalog.Catalog = _
+  private var warehousePath: String = _
+
+  override protected def lakeFormat: Option[DataLakeFormat] = 
Some(DataLakeFormat.PAIMON)
+
+  override def flussConf: Configuration = {
+    val conf = super.flussConf
+    conf.setInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE)
+    conf.setString("datalake.format", lakeFormat.get.toString)
+    conf.setString("datalake.paimon.metastore", "filesystem")
+    conf.setString("datalake.paimon.cache-enabled", "false")
+    try {
+      warehousePath =
+        
Files.createTempDirectory("fluss-testing-datalake-tiered").resolve("warehouse").toString
+    } catch {
+      case e: Exception =>
+        throw new FlussRuntimeException("Failed to create warehouse path")
+    }
+    conf.setString("datalake.paimon.warehouse", warehousePath)
+    paimonCatalog = org.apache.paimon.catalog.CatalogFactory.createCatalog(
+      org.apache.paimon.catalog.CatalogContext.create(
+        
org.apache.paimon.options.Options.fromMap(LakeStorageUtils.extractLakeProperties(conf))))
+    conf
+  }
+
+  override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    super.withTable(tableNames: _*)(f)
+    tableNames.foreach(
+      tableName =>
+        paimonCatalog.dropTable(
+          org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, 
tableName),
+          true))
+  }
+
+  private def verifyLakePaimonTable(
+      paimonTable: org.apache.paimon.table.Table,
+      flussTable: TableInfo,
+      expectedPaimonRowType: org.apache.paimon.types.RowType,
+      expectedBucketKey: String,
+      bucketNum: Int): Unit = {
+    if (!flussTable.hasPrimaryKey) {
+      assert(paimonTable.primaryKeys.isEmpty)
+    } else {
+      assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames, 
"check primary key")(
+        paimonTable.primaryKeys())
+    }
+
+    if (flussTable.isPartitioned) {
+      assertResult(flussTable.getPartitionKeys, "check partition 
keys")(paimonTable.partitionKeys())
+    }
+
+    assert(flussTable.getNumBuckets == bucketNum)
+
+    if (expectedBucketKey != null) {
+      assert(
+        paimonTable
+          .options()
+          .asScala
+          .getOrElse(
+            org.apache.paimon.CoreOptions.BUCKET.key(),
+            org.apache.paimon.CoreOptions.BUCKET.defaultValue().toString)
+          .toInt == bucketNum)
+    }
+
+    if (flussTable.hasBucketKey) {
+      assertResult(expectedBucketKey, "check fluss table bucket key")(
+        flussTable.getBucketKeys.asScala.mkString(","))
+      assertResult(expectedBucketKey, "check paimon table bucket key")(
+        
paimonTable.options().get(org.apache.paimon.CoreOptions.BUCKET_KEY.key()))
+    }
+
+    val expectedProperties =
+      (flussTable.getProperties.toMap.asScala ++ 
flussTable.getCustomProperties.toMap.asScala)
+        .filterNot(_._1.startsWith(FlussConfigUtils.TABLE_PREFIX + 
"datalake."))
+        .map {
+          case (k, v) =>
+            if (k.startsWith("paimon.")) {
+              (k.substring("paimon.".length), v)
+            } else {
+              (s"fluss.$k", v)
+            }
+        }
+        .toMap
+    
paimonTable.options().asScala.should(contain).allElementsOf(expectedProperties)
+
+    val paimonRowType = paimonTable.rowType
+    assert(paimonRowType.getFieldCount == expectedPaimonRowType.getFieldCount)
+    
paimonRowType.getFields.asScala.zip(expectedPaimonRowType.getFields.asScala).foreach
 {
+      case (actual, expect) =>
+        assert(actual.equalsIgnoreFieldId(expect), s"check table schema: 
$actual vs $expect")
+    }
+
+    assert(flussTable.getComment.equals(paimonTable.comment()), "check table 
comments")
+  }
+
+  test("Lake Catalog: basic table") {
+    // bucket log table
+    var tableName = "bucket_log_table"
+    withTable(tableName) {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, 
'${BUCKET_KEY.key()}' = 'id',
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
tableName)).get()
+      val paimonTable =
+        paimonCatalog.getTable(
+          org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, 
tableName))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, 
TIMESTAMP_COLUMN_NAME)
+          ),
+        "id",
+        2
+      )
+    }
+
+    // non-bucket log table
+    tableName = "non_bucket_log_table"
+    withTable(tableName) {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
tableName)).get()
+      val paimonTable =
+        paimonCatalog.getTable(
+          org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, 
tableName))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, 
TIMESTAMP_COLUMN_NAME)
+          ),
+        null,
+        2
+      )
+    }
+
+    // pk table
+    tableName = "pk_table"
+    withTable(tableName) {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, 
pk1 string)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, 
'${BUCKET_KEY.key()}' = 'id',
+             |  '${PRIMARY_KEY.key()}' = 'pk1, id',
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
tableName)).get()
+      val paimonTable =
+        paimonCatalog.getTable(
+          org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, 
tableName))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.INT.notNull(),
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.STRING.notNull(),
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply(
+              "id",
+              "name",
+              "pk1",
+              BUCKET_COLUMN_NAME,
+              OFFSET_COLUMN_NAME,
+              TIMESTAMP_COLUMN_NAME)
+          ),
+        "id",
+        2
+      )
+    }
+
+    // partitioned pk table
+    tableName = "partitioned_pk_table"
+    withTable(tableName) {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string, 
pk1 string, pt1 string)
+             | PARTITIONED BY (pt1)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, 
'${BUCKET_KEY.key()}' = 'id',
+             |  '${PRIMARY_KEY.key()}' = 'pk1, id, pt1',
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
tableName)).get()
+      val paimonTable =
+        paimonCatalog.getTable(
+          org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE, 
tableName))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.INT.notNull(),
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.STRING.notNull(),
+              org.apache.paimon.types.DataTypes.STRING.notNull(),
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply(
+              "id",
+              "name",
+              "pk1",
+              "pt1",
+              BUCKET_COLUMN_NAME,
+              OFFSET_COLUMN_NAME,
+              TIMESTAMP_COLUMN_NAME)
+          ),
+        "id",
+        2
+      )
+    }
+  }
+
+  test("Lake Catalog: table with all types") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t
+             | (c1 boolean, c2 byte, c3 short, c4 int, c5 long, c6 float, c7 
double, c8 date,
+             |  c9 timestamp, c10 timestamp_ntz, c11 string, c12 binary, c13 
decimal(10, 2),
+             |  c14 array<int>, c15 struct<a int, b string>, c16 map<string, 
int>)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
"t")).get()
+      val paimonTable =
+        
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
 "t"))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.BOOLEAN,
+              org.apache.paimon.types.DataTypes.TINYINT,
+              org.apache.paimon.types.DataTypes.SMALLINT,
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.FLOAT,
+              org.apache.paimon.types.DataTypes.DOUBLE,
+              org.apache.paimon.types.DataTypes.DATE,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+              org.apache.paimon.types.DataTypes.TIMESTAMP,
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.BYTES,
+              org.apache.paimon.types.DataTypes.DECIMAL(10, 2),
+              
org.apache.paimon.types.DataTypes.ARRAY(org.apache.paimon.types.DataTypes.INT),
+              org.apache.paimon.types.DataTypes.ROW(
+                org.apache.paimon.types.DataTypes
+                  .FIELD(0, "a", org.apache.paimon.types.DataTypes.INT),
+                org.apache.paimon.types.DataTypes
+                  .FIELD(1, "b", org.apache.paimon.types.DataTypes.STRING)
+              ),
+              org.apache.paimon.types.DataTypes.MAP(
+                org.apache.paimon.types.DataTypes.STRING.notNull(),
+                org.apache.paimon.types.DataTypes.INT),
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply(
+              "c1",
+              "c2",
+              "c3",
+              "c4",
+              "c5",
+              "c6",
+              "c7",
+              "c8",
+              "c9",
+              "c10",
+              "c11",
+              "c12",
+              "c13",
+              "c14",
+              "c15",
+              "c16",
+              BUCKET_COLUMN_NAME,
+              OFFSET_COLUMN_NAME,
+              TIMESTAMP_COLUMN_NAME)
+          ),
+        null,
+        2
+      )
+    }
+  }
+
+  test("Lake Catalog: unsettable properties") {
+    withTable("t") {
+      val unsettableProperties =
+        PaimonConversions.PAIMON_UNSETTABLE_OPTIONS.asScala.map(e => s"'$e' = 
'v'").mkString(", ")
+
+      intercept[java.util.concurrent.ExecutionException] {
+        sql(s"""
+               |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
+               | TBLPROPERTIES (
+               |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+               |  $unsettableProperties,
+               |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+               |""".stripMargin)
+      }.getCause.shouldBe(a[org.apache.fluss.exception.InvalidConfigException])
+    }
+  }
+
+  test("Lake Catalog: alter table with lake enabled") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = false,
+             |  '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1', 
'paimon.file.format' = 'parquet')
+             |""".stripMargin)
+      intercept[org.apache.paimon.catalog.Catalog.TableNotExistException] {
+        
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
 "t"))
+      }
+
+      sql(
+        s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES 
('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)")
+
+      val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
"t")).get()
+      val paimonTable =
+        
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
 "t"))
+      verifyLakePaimonTable(
+        paimonTable,
+        flussTable,
+        org.apache.paimon.types.RowType
+          .of(
+            Array.apply(
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.STRING,
+              org.apache.paimon.types.DataTypes.INT,
+              org.apache.paimon.types.DataTypes.BIGINT,
+              org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+            ),
+            Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME, 
TIMESTAMP_COLUMN_NAME)
+          ),
+        null,
+        2
+      )
+    }
+  }
+}

Reply via email to