This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 087a05bbb [spark] Add sparksql lake table catalog DDL suite for paimon
(#2438)
087a05bbb is described below
commit 087a05bbb4c0ecb43b7b6e6c4cea16500721999e
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Jan 28 10:59:33 2026 +0800
[spark] Add sparksql lake table catalog DDL suite for paimon (#2438)
---
.../org/apache/fluss/spark/SparkCatalog.scala | 2 +-
fluss-spark/fluss-spark-ut/pom.xml | 14 +
...apache.fluss.lake.lakestorage.LakeStoragePlugin | 19 +
.../apache/fluss/spark/FlussSparkTestBase.scala | 64 ++--
.../org/apache/fluss/spark/SparkCatalogTest.scala | 37 +-
.../lake/paimon/SparkLakePaimonCatalogTest.scala | 415 +++++++++++++++++++++
6 files changed, 498 insertions(+), 53 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index cdc2206f2..045063e4f 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -74,7 +74,7 @@ class SparkCatalog extends TableCatalog with
SupportsFlussNamespaces with WithFl
} else if (e.getCause.isInstanceOf[TableAlreadyExistException]) {
throw new TableAlreadyExistsException(ident)
} else {
- throw new RuntimeException(e)
+ throw e
}
}
}
diff --git a/fluss-spark/fluss-spark-ut/pom.xml
b/fluss-spark/fluss-spark-ut/pom.xml
index ffa424d26..96d1ccb45 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -81,6 +81,20 @@
<artifactId>fluss-test-utils</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-lake-paimon</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ <version>${paimon.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- for curator TestingServer -->
<dependency>
<groupId>org.apache.curator</groupId>
diff --git
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
new file mode 100644
index 000000000..69bf0f8a4
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 6dd2c4d86..4bdf5e84d 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -26,10 +26,8 @@ import org.apache.fluss.metadata.{DataLakeFormat,
TableDescriptor, TablePath}
import org.apache.fluss.row.InternalRow
import org.apache.fluss.server.testutils.FlussClusterExtension
-import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
-import org.junit.jupiter.api.extension.RegisterExtension
import java.time.Duration
@@ -37,32 +35,49 @@ import scala.collection.JavaConverters._
class FlussSparkTestBase extends QueryTest with SharedSparkSession {
- import FlussSparkTestBase._
-
protected val DEFAULT_CATALOG = "fluss_catalog"
protected val DEFAULT_DATABASE = "fluss"
protected var conn: Connection = _
protected var admin: Admin = _
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
- .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
bootstrapServers)
- .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
- // Enable read optimized by default temporarily.
- // TODO: remove this when https://github.com/apache/fluss/issues/2427 is
done.
- .set("spark.sql.fluss.readOptimized", "true")
- }
+ val flussServer: FlussClusterExtension =
+ FlussClusterExtension.builder
+ .setClusterConf(flussConf)
+ .setNumOfTabletServers(3)
+ .build
override protected def beforeAll(): Unit = {
super.beforeAll()
- conn = ConnectionFactory.createConnection(clientConf)
+ flussServer.start()
+ conn = ConnectionFactory.createConnection(flussServer.getClientConfig)
admin = conn.getAdmin
+ spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
+ spark.conf.set(
+ s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
+ flussServer.getBootstrapServers)
+ spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+ // Enable read optimized by default temporarily.
+ // TODO: remove this when https://github.com/apache/fluss/issues/2427 is
done.
+ spark.conf.set("spark.sql.fluss.readOptimized", "true")
+
sql(s"USE $DEFAULT_DATABASE")
}
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ if (admin != null) {
+ admin.close()
+ admin = null
+ }
+ if (conn != null) {
+ conn.close()
+ conn = null
+ }
+ flussServer.close()
+ }
+
def createTablePath(tableName: String): TablePath = {
TablePath.of(DEFAULT_DATABASE, tableName)
}
@@ -98,22 +113,9 @@ class FlussSparkTestBase extends QueryTest with
SharedSparkSession {
.map(record => (record.getChangeType.shortString(), record.getRow))
.toArray
}
-}
-
-@RegisterExtension
-object FlussSparkTestBase {
- val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension =
- FlussClusterExtension.builder
- .setClusterConf(
- new Configuration()
- .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON)
- )
- .setNumOfTabletServers(3)
- .build
- FLUSS_CLUSTER_EXTENSION.start()
-
- val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig
- val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers
+ protected def flussConf: Configuration = {
+ val conf = new Configuration
+ conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ }
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
index 242e7998e..e30b03baa 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala
@@ -34,6 +34,8 @@ import scala.collection.JavaConverters._
class SparkCatalogTest extends FlussSparkTestBase {
+ protected def lakeFormat: Option[DataLakeFormat] = None
+
test("Catalog: namespaces") {
// Always a default database 'fluss'.
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
@@ -197,27 +199,28 @@ class SparkCatalogTest extends FlussSparkTestBase {
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
}
- test("Catalog: set/remove table properties") {
+ protected def modifyTablePropertiesWithCheck(): Unit = {
withTable("t") {
sql(
s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}'
= 3)")
var flussTable = admin.getTableInfo(createTablePath("t")).get()
assertResult(flussTable.getNumBuckets, "check bucket num")(3)
- assertResult(
+ val expectTableProperties = if (lakeFormat.isDefined) {
Map(
ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
- ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
- "check table properties")(flussTable.getProperties.toMap.asScala)
+ ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> lakeFormat.get.toString)
+ } else {
+ Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1")
+ }
+ assertResult(expectTableProperties, "check table properties")(
+ flussTable.getProperties.toMap.asScala)
assert(
flussTable.getCustomProperties.toMap.asScala.getOrElse("key1",
"non-exists") == "value1")
sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' =
'value2')")
flussTable = admin.getTableInfo(createTablePath("t")).get()
- assertResult(
- Map(
- ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
- ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"),
- "check table properties")(flussTable.getProperties.toMap.asScala)
+ assertResult(expectTableProperties, "check table properties")(
+ flussTable.getProperties.toMap.asScala)
assert(
flussTable.getCustomProperties.toMap.asScala.getOrElse("key1",
"non-exists") == "value2")
assert(
@@ -231,18 +234,6 @@ class SparkCatalogTest extends FlussSparkTestBase {
// no error if unset not-exists key
sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')")
- sql(
- s"ALTER TABLE t SET
TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')")
- flussTable = admin.getTableInfo(createTablePath("t")).get()
- assertResult(
- Map(
- ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1",
- ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon",
- ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"
- ),
- "check table properties"
- )(flussTable.getProperties.toMap.asScala)
-
// Most table properties with prefix of 'table.' are not allowed to be
modified.
intercept[ExecutionException] {
sql(
@@ -251,6 +242,10 @@ class SparkCatalogTest extends FlussSparkTestBase {
}
}
+ test("Catalog: set/remove table properties") {
+ modifyTablePropertiesWithCheck()
+ }
+
test("Partition: show partitions") {
withTable("t") {
sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int)
PARTITIONED BY (pt1, pt2)")
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
new file mode 100644
index 000000000..d5a29e163
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/paimon/SparkLakePaimonCatalogTest.scala
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake.paimon
+
+import org.apache.fluss.config.{ConfigOptions, Configuration, FlussConfigUtils}
+import org.apache.fluss.exception.FlussRuntimeException
+import org.apache.fluss.lake.paimon.utils.PaimonConversions
+import org.apache.fluss.metadata._
+import org.apache.fluss.metadata.TableDescriptor.{BUCKET_COLUMN_NAME,
OFFSET_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}
+import org.apache.fluss.server.utils.LakeStorageUtils
+import org.apache.fluss.spark.SparkCatalogTest
+import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_KEY,
BUCKET_NUMBER, PRIMARY_KEY}
+
+import org.scalatest.matchers.must.Matchers.contain
+import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper}
+
+import java.nio.file.Files
+
+import scala.collection.JavaConverters._
+
+class SparkLakePaimonCatalogTest extends SparkCatalogTest {
+
+ private var paimonCatalog: org.apache.paimon.catalog.Catalog = _
+ private var warehousePath: String = _
+
+ override protected def lakeFormat: Option[DataLakeFormat] =
Some(DataLakeFormat.PAIMON)
+
+ override def flussConf: Configuration = {
+ val conf = super.flussConf
+ conf.setInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE)
+ conf.setString("datalake.format", lakeFormat.get.toString)
+ conf.setString("datalake.paimon.metastore", "filesystem")
+ conf.setString("datalake.paimon.cache-enabled", "false")
+ try {
+ warehousePath =
+
Files.createTempDirectory("fluss-testing-datalake-tiered").resolve("warehouse").toString
+ } catch {
+ case e: Exception =>
+ throw new FlussRuntimeException("Failed to create warehouse path")
+ }
+ conf.setString("datalake.paimon.warehouse", warehousePath)
+ paimonCatalog = org.apache.paimon.catalog.CatalogFactory.createCatalog(
+ org.apache.paimon.catalog.CatalogContext.create(
+
org.apache.paimon.options.Options.fromMap(LakeStorageUtils.extractLakeProperties(conf))))
+ conf
+ }
+
+ override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ super.withTable(tableNames: _*)(f)
+ tableNames.foreach(
+ tableName =>
+ paimonCatalog.dropTable(
+ org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
tableName),
+ true))
+ }
+
+ private def verifyLakePaimonTable(
+ paimonTable: org.apache.paimon.table.Table,
+ flussTable: TableInfo,
+ expectedPaimonRowType: org.apache.paimon.types.RowType,
+ expectedBucketKey: String,
+ bucketNum: Int): Unit = {
+ if (!flussTable.hasPrimaryKey) {
+ assert(paimonTable.primaryKeys.isEmpty)
+ } else {
+ assertResult(flussTable.getSchema.getPrimaryKey.get().getColumnNames,
"check primary key")(
+ paimonTable.primaryKeys())
+ }
+
+ if (flussTable.isPartitioned) {
+ assertResult(flussTable.getPartitionKeys, "check partition
keys")(paimonTable.partitionKeys())
+ }
+
+ assert(flussTable.getNumBuckets == bucketNum)
+
+ if (expectedBucketKey != null) {
+ assert(
+ paimonTable
+ .options()
+ .asScala
+ .getOrElse(
+ org.apache.paimon.CoreOptions.BUCKET.key(),
+ org.apache.paimon.CoreOptions.BUCKET.defaultValue().toString)
+ .toInt == bucketNum)
+ }
+
+ if (flussTable.hasBucketKey) {
+ assertResult(expectedBucketKey, "check fluss table bucket key")(
+ flussTable.getBucketKeys.asScala.mkString(","))
+ assertResult(expectedBucketKey, "check paimon table bucket key")(
+
paimonTable.options().get(org.apache.paimon.CoreOptions.BUCKET_KEY.key()))
+ }
+
+ val expectedProperties =
+ (flussTable.getProperties.toMap.asScala ++
flussTable.getCustomProperties.toMap.asScala)
+ .filterNot(_._1.startsWith(FlussConfigUtils.TABLE_PREFIX +
"datalake."))
+ .map {
+ case (k, v) =>
+ if (k.startsWith("paimon.")) {
+ (k.substring("paimon.".length), v)
+ } else {
+ (s"fluss.$k", v)
+ }
+ }
+ .toMap
+
paimonTable.options().asScala.should(contain).allElementsOf(expectedProperties)
+
+ val paimonRowType = paimonTable.rowType
+ assert(paimonRowType.getFieldCount == expectedPaimonRowType.getFieldCount)
+
paimonRowType.getFields.asScala.zip(expectedPaimonRowType.getFields.asScala).foreach
{
+ case (actual, expect) =>
+ assert(actual.equalsIgnoreFieldId(expect), s"check table schema:
$actual vs $expect")
+ }
+
+ assert(flussTable.getComment.equals(paimonTable.comment()), "check table
comments")
+ }
+
+ test("Lake Catalog: basic table") {
+ // bucket log table
+ var tableName = "bucket_log_table"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
'${BUCKET_KEY.key()}' = 'id',
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
tableName)).get()
+ val paimonTable =
+ paimonCatalog.getTable(
+ org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
tableName))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME,
TIMESTAMP_COLUMN_NAME)
+ ),
+ "id",
+ 2
+ )
+ }
+
+ // non-bucket log table
+ tableName = "non_bucket_log_table"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
tableName)).get()
+ val paimonTable =
+ paimonCatalog.getTable(
+ org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
tableName))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME,
TIMESTAMP_COLUMN_NAME)
+ ),
+ null,
+ 2
+ )
+ }
+
+ // pk table
+ tableName = "pk_table"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string,
pk1 string)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
'${BUCKET_KEY.key()}' = 'id',
+ | '${PRIMARY_KEY.key()}' = 'pk1, id',
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
tableName)).get()
+ val paimonTable =
+ paimonCatalog.getTable(
+ org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
tableName))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.INT.notNull(),
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.STRING.notNull(),
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply(
+ "id",
+ "name",
+ "pk1",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME)
+ ),
+ "id",
+ 2
+ )
+ }
+
+ // partitioned pk table
+ tableName = "partitioned_pk_table"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.$tableName (id int, name string,
pk1 string, pt1 string)
+ | PARTITIONED BY (pt1)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
'${BUCKET_KEY.key()}' = 'id',
+ | '${PRIMARY_KEY.key()}' = 'pk1, id, pt1',
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
tableName)).get()
+ val paimonTable =
+ paimonCatalog.getTable(
+ org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
tableName))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.INT.notNull(),
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.STRING.notNull(),
+ org.apache.paimon.types.DataTypes.STRING.notNull(),
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply(
+ "id",
+ "name",
+ "pk1",
+ "pt1",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME)
+ ),
+ "id",
+ 2
+ )
+ }
+ }
+
+ test("Lake Catalog: table with all types") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t
+ | (c1 boolean, c2 byte, c3 short, c4 int, c5 long, c6 float, c7
double, c8 date,
+ | c9 timestamp, c10 timestamp_ntz, c11 string, c12 binary, c13
decimal(10, 2),
+ | c14 array<int>, c15 struct<a int, b string>, c16 map<string,
int>)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
"t")).get()
+ val paimonTable =
+
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
"t"))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.BOOLEAN,
+ org.apache.paimon.types.DataTypes.TINYINT,
+ org.apache.paimon.types.DataTypes.SMALLINT,
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.FLOAT,
+ org.apache.paimon.types.DataTypes.DOUBLE,
+ org.apache.paimon.types.DataTypes.DATE,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ org.apache.paimon.types.DataTypes.TIMESTAMP,
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.BYTES,
+ org.apache.paimon.types.DataTypes.DECIMAL(10, 2),
+
org.apache.paimon.types.DataTypes.ARRAY(org.apache.paimon.types.DataTypes.INT),
+ org.apache.paimon.types.DataTypes.ROW(
+ org.apache.paimon.types.DataTypes
+ .FIELD(0, "a", org.apache.paimon.types.DataTypes.INT),
+ org.apache.paimon.types.DataTypes
+ .FIELD(1, "b", org.apache.paimon.types.DataTypes.STRING)
+ ),
+ org.apache.paimon.types.DataTypes.MAP(
+ org.apache.paimon.types.DataTypes.STRING.notNull(),
+ org.apache.paimon.types.DataTypes.INT),
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply(
+ "c1",
+ "c2",
+ "c3",
+ "c4",
+ "c5",
+ "c6",
+ "c7",
+ "c8",
+ "c9",
+ "c10",
+ "c11",
+ "c12",
+ "c13",
+ "c14",
+ "c15",
+ "c16",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME)
+ ),
+ null,
+ 2
+ )
+ }
+ }
+
+ test("Lake Catalog: unsettable properties") {
+ withTable("t") {
+ val unsettableProperties =
+ PaimonConversions.PAIMON_UNSETTABLE_OPTIONS.asScala.map(e => s"'$e' =
'v'").mkString(", ")
+
+ intercept[java.util.concurrent.ExecutionException] {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | $unsettableProperties,
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+ }.getCause.shouldBe(a[org.apache.fluss.exception.InvalidConfigException])
+ }
+ }
+
+ test("Lake Catalog: alter table with lake enabled") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (id int, name string)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = false,
+ | '${BUCKET_NUMBER.key()}' = 2, 'k1' = 'v1',
'paimon.file.format' = 'parquet')
+ |""".stripMargin)
+ intercept[org.apache.paimon.catalog.Catalog.TableNotExistException] {
+
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
"t"))
+ }
+
+ sql(
+ s"ALTER TABLE $DEFAULT_DATABASE.t SET TBLPROPERTIES
('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true)")
+
+ val flussTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
"t")).get()
+ val paimonTable =
+
paimonCatalog.getTable(org.apache.paimon.catalog.Identifier.create(DEFAULT_DATABASE,
"t"))
+ verifyLakePaimonTable(
+ paimonTable,
+ flussTable,
+ org.apache.paimon.types.RowType
+ .of(
+ Array.apply(
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.STRING,
+ org.apache.paimon.types.DataTypes.INT,
+ org.apache.paimon.types.DataTypes.BIGINT,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS
+ ),
+ Array.apply("id", "name", BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME,
TIMESTAMP_COLUMN_NAME)
+ ),
+ null,
+ 2
+ )
+ }
+ }
+}