This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5ed3b37b9 [spark] Initiate fluss-spark and introduce spark catalog and
table (#2219)
5ed3b37b9 is described below
commit 5ed3b37b95f9a596e3fc8b4dbc41e1e76518dc6f
Author: Yann Byron <[email protected]>
AuthorDate: Fri Dec 26 20:30:27 2025 +0800
[spark] Initiate fluss-spark and introduce spark catalog and table (#2219)
---
.github/workflows/ci-template.yaml | 2 +-
.github/workflows/stage.sh | 22 +-
.scalafmt.conf | 70 +++++
copyright.txt | 17 ++
.../org/apache/fluss/config/Configuration.java | 4 +-
fluss-spark/fluss-spark-3.4/pom.xml | 129 ++++++++
fluss-spark/fluss-spark-3.5/pom.xml | 129 ++++++++
fluss-spark/fluss-spark-common/pom.xml | 71 +++++
.../org/apache/fluss/spark/SparkCatalog.scala | 109 +++++++
.../apache/fluss/spark/SparkConnectorOptions.scala | 61 ++++
.../org/apache/fluss/spark/SparkConversions.scala | 100 ++++++
.../scala/org/apache/fluss/spark/SparkTable.scala | 25 ++
.../fluss/spark/catalog/AbstractSparkTable.scala | 43 +++
.../spark/catalog/SupportsFlussNamespaces.scala | 95 ++++++
.../catalog/SupportsFlussPartitionManagement.scala | 53 ++++
.../fluss/spark/catalog/WithFlussAdmin.scala | 60 ++++
.../spark/types/FlussToSparkTypeVisitor.scala | 116 +++++++
.../spark/types/SparkToFlussTypeVisitor.scala | 99 ++++++
.../apache/spark/sql/FlussIdentityTransform.scala | 31 ++
fluss-spark/fluss-spark-ut/pom.xml | 125 ++++++++
.../org/apache/fluss/spark/FlussCatalogTest.scala | 142 +++++++++
.../apache/fluss/spark/FlussSparkTestBase.scala | 81 +++++
fluss-spark/pom.xml | 337 +++++++++++++++++++++
fluss-test-coverage/pom.xml | 41 +++
pom.xml | 19 ++
25 files changed, 1978 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/ci-template.yaml
b/.github/workflows/ci-template.yaml
index c5f77af3a..7179b0c68 100644
--- a/.github/workflows/ci-template.yaml
+++ b/.github/workflows/ci-template.yaml
@@ -36,7 +36,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- module: [ core, flink, lake ]
+ module: [ core, flink, spark3, lake ]
name: "${{ matrix.module }}"
steps:
- name: Checkout code
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index 18587dc29..b35b74e47 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -19,6 +19,7 @@
STAGE_CORE="core"
STAGE_FLINK="flink"
+STAGE_SPARK="spark3"
STAGE_LAKE="lake"
MODULES_FLINK="\
@@ -28,6 +29,20 @@ fluss-flink/fluss-flink-2.2,\
fluss-flink/fluss-flink-1.20,\
"
+MODULES_COMMON_SPARK="\
+fluss-spark,\
+fluss-spark/fluss-spark-common,\
+fluss-spark/fluss-spark-ut,\
+"
+
+MODULES_SPARK3="\
+fluss-spark,\
+fluss-spark/fluss-spark-common,\
+fluss-spark/fluss-spark-ut,\
+fluss-spark/fluss-spark-3.5,\
+fluss-spark/fluss-spark-3.4,\
+"
+
# we move Flink legacy version tests to "lake" module for balancing testing
time
MODULES_LAKE="\
fluss-flink/fluss-flink-1.19,\
@@ -42,10 +57,12 @@ function get_test_modules_for_stage() {
local stage=$1
local modules_flink=$MODULES_FLINK
+ local modules_spark3=$MODULES_SPARK3
local modules_lake=$MODULES_LAKE
local negated_flink=\!${MODULES_FLINK//,/,\!}
+ local negated_spark=\!${MODULES_COMMON_SPARK//,/,\!}
local negated_lake=\!${MODULES_LAKE//,/,\!}
- local modules_core="$negated_flink,$negated_lake"
+ local modules_core="$negated_flink,$negated_spark,$negated_lake"
case ${stage} in
(${STAGE_CORE})
@@ -54,6 +71,9 @@ function get_test_modules_for_stage() {
(${STAGE_FLINK})
echo "-pl fluss-test-coverage,$modules_flink"
;;
+ (${STAGE_SPARK})
+ echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
+ ;;
(${STAGE_LAKE})
echo "-pl fluss-test-coverage,$modules_lake"
;;
diff --git a/.scalafmt.conf b/.scalafmt.conf
new file mode 100644
index 000000000..7a7c55f0f
--- /dev/null
+++ b/.scalafmt.conf
@@ -0,0 +1,70 @@
+runner.dialect = scala212
+
+# Version is required to make sure IntelliJ picks the right version
+version = 3.10.2
+preset = default
+
+# Max column
+maxColumn = 100
+
+# This parameter simply says the .stripMargin method was not redefined by the
user to assign
+# special meaning to indentation preceding the | character. Hence, that
indentation can be modified.
+assumeStandardLibraryStripMargin = true
+align.stripMargin = true
+
+# Align settings
+align.preset = none
+align.closeParenSite = false
+align.openParenCallSite = false
+danglingParentheses.defnSite = false
+danglingParentheses.callSite = false
+danglingParentheses.ctrlSite = true
+danglingParentheses.tupleSite = false
+align.openParenCallSite = false
+align.openParenDefnSite = false
+align.openParenTupleSite = false
+
+# Newlines
+newlines.alwaysBeforeElseAfterCurlyIf = false
+newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params
+newlines.afterCurlyLambdaParams = squash # No newline after lambda params
+newlines.inInterpolation = "avoid"
+newlines.avoidInResultType = true
+optIn.annotationNewlines = true
+
+# Scaladoc
+docstrings.style = Asterisk # Javadoc style
+docstrings.removeEmpty = true
+docstrings.oneline = fold
+docstrings.forceBlankLineBefore = true
+
+# Indentation
+indent.extendSite = 2 # This makes sure extend is not indented as the ctor
parameters
+
+# Rewrites
+rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers]
+
+# Imports
+rewrite.imports.sort = scalastyle
+rewrite.imports.groups = [
+ ["org.apache.fluss\\..*"],
+ ["org.apache.fluss.shade\\..*"],
+ [".*"],
+ ["javax\\..*"],
+ ["java\\..*"],
+ ["scala\\..*"]
+]
+rewrite.imports.contiguousGroups = no
+importSelectors = singleline # Imports in a single line, like IntelliJ
+
+# Remove redundant braces in string interpolation.
+rewrite.redundantBraces.stringInterpolation = true
+rewrite.redundantBraces.defnBodies = false
+rewrite.redundantBraces.generalExpressions = false
+rewrite.redundantBraces.ifElseExpressions = false
+rewrite.redundantBraces.methodBodies = false
+rewrite.redundantBraces.includeUnitMethods = false
+rewrite.redundantBraces.maxBreaks = 1
+
+# Remove trailing commas
+rewrite.trailingCommas.style = "never"
diff --git a/copyright.txt b/copyright.txt
new file mode 100644
index 000000000..29400e587
--- /dev/null
+++ b/copyright.txt
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
index a7ee42f75..2c985f5a1 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
@@ -18,6 +18,7 @@
package org.apache.fluss.config;
import org.apache.fluss.annotation.PublicStable;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.utils.CollectionUtils;
import org.slf4j.Logger;
@@ -643,7 +644,8 @@ public class Configuration implements Serializable,
ReadableConfig {
}
}
- private Optional<Object> getRawValue(String key) {
+ @VisibleForTesting
+ public Optional<Object> getRawValue(String key) {
if (key == null) {
throw new NullPointerException("Key must not be null.");
}
diff --git a/fluss-spark/fluss-spark-3.4/pom.xml
b/fluss-spark/fluss-spark-3.4/pom.xml
new file mode 100644
index 000000000..a5d10f39a
--- /dev/null
+++ b/fluss-spark/fluss-spark-3.4/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-spark-3.4_${scala.binary.version}</artifactId>
+ <name>Fluss : Engine Spark : 3.4</name>
+
+ <properties>
+ <spark.version>3.4.3</spark.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <!-- compilation of main sources -->
+ <skipMain>${skip.on.java8}</skipMain>
+ <!-- compilation of test sources -->
+ <skip>${skip.on.java8}</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.0.0-M5</version>
+ <executions>
+ <!-- Test end with ITCase is e2e test in this module -->
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <inherited>false</inherited>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>${skip.on.java8}</skip>
+ <includes>
+ <include>**/*ITCase.*</include>
+ </includes>
+ <!-- e2e test with flink/zookeeper cluster, we set
forkCount=1 -->
+ <forkCount>1</forkCount>
+ </configuration>
+ </execution>
+ <!-- others unit tests -->
+ <execution>
+ <id>default-test</id>
+ <phase>test</phase>
+ <inherited>false</inherited>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>${skip.on.java8}</skip>
+ <excludes>
+ <exclude>**/*ITCase.*</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-fluss</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.fluss:fluss-spark-common</include>
+
<include>org.apache.fluss:fluss-client</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/fluss-spark/fluss-spark-3.5/pom.xml
b/fluss-spark/fluss-spark-3.5/pom.xml
new file mode 100644
index 000000000..1f55f0e12
--- /dev/null
+++ b/fluss-spark/fluss-spark-3.5/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-spark-3.5_${scala.binary.version}</artifactId>
+ <name>Fluss : Engine Spark : 3.5</name>
+
+ <properties>
+ <spark.version>3.5.7</spark.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <!-- compilation of main sources -->
+ <skipMain>${skip.on.java8}</skipMain>
+ <!-- compilation of test sources -->
+ <skip>${skip.on.java8}</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.0.0-M5</version>
+ <executions>
+ <!-- Test end with ITCase is e2e test in this module -->
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <inherited>false</inherited>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>${skip.on.java8}</skip>
+ <includes>
+ <include>**/*ITCase.*</include>
+ </includes>
+ <!-- e2e test with flink/zookeeper cluster, we set
forkCount=1 -->
+ <forkCount>1</forkCount>
+ </configuration>
+ </execution>
+ <!-- others unit tests -->
+ <execution>
+ <id>default-test</id>
+ <phase>test</phase>
+ <inherited>false</inherited>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>${skip.on.java8}</skip>
+ <excludes>
+ <exclude>**/*ITCase.*</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-fluss</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.fluss:fluss-spark-common</include>
+
<include>org.apache.fluss:fluss-client</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/fluss-spark/fluss-spark-common/pom.xml
b/fluss-spark/fluss-spark-common/pom.xml
new file mode 100644
index 000000000..e285b8bbc
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+ <name>Fluss : Engine Spark : Common</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-fluss</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.fluss:fluss-spark-common</include>
+
<include>org.apache.fluss:fluss-client</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
new file mode 100644
index 000000000..b620d2700
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.exception.{DatabaseNotExistException,
TableAlreadyExistException, TableNotExistException}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin}
+
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table,
TableCatalog, TableChange}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import java.util.concurrent.ExecutionException
+
+import scala.collection.JavaConverters._
+
+class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with
WithFlussAdmin {
+
+ private var catalogName: String = "fluss"
+
+ override def listTables(namespace: Array[String]): Array[Identifier] = {
+ doNamespaceOperator(namespace) {
+ admin
+ .listTables(namespace.head)
+ .get()
+ .asScala
+ .map(table => Identifier.of(namespace, table))
+ .toArray
+ }
+ }
+
+ override def loadTable(ident: Identifier): Table = {
+ try {
+ SparkTable(admin.getTableInfo(toTablePath(ident)).get())
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[TableNotExistException] =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+
+ override def createTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ try {
+ val tableDescriptor = SparkConversions.toFlussTable(schema, partitions,
properties)
+ admin.createTable(toTablePath(ident), tableDescriptor, false).get()
+ loadTable(ident)
+ } catch {
+ case e: ExecutionException =>
+ if (e.getCause.isInstanceOf[DatabaseNotExistException]) {
+ throw new NoSuchNamespaceException(ident.namespace())
+ } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) {
+ throw new TableAlreadyExistsException(ident)
+ } else {
+ throw new RuntimeException(e)
+ }
+ }
+ }
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ throw new UnsupportedOperationException("Altering table is not supported")
+ }
+
+ override def dropTable(ident: Identifier): Boolean = {
+ try {
+ admin.dropTable(toTablePath(ident), false).get()
+ true
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[TableNotExistException] =>
+ throw new NoSuchTableException(ident)
+ }
+ }
+
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
{
+ throw new UnsupportedOperationException("Renaming table is not supported")
+ }
+
+ override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {
+ catalogName = name;
+ initFlussClient(options)
+ }
+
+ override def name(): String = catalogName
+
+ private def toTablePath(ident: Identifier): TablePath = {
+ assert(ident.namespace().length == 1, "Only single namespace is supported")
+ TablePath.of(ident.namespace().head, ident.name)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
new file mode 100644
index 000000000..71b168985
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
+
+object SparkConnectorOptions {
+
+ val PRIMARY_KEY: ConfigOption[String] =
+ ConfigBuilder
+ .key("primary.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The primary keys of a Fluss table.")
+
+ val BUCKET_KEY: ConfigOption[String] =
+ ConfigBuilder
+ .key("bucket.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ """
+ |Specific the distribution policy of the Fluss table.
+ |Data will be distributed to each bucket according to the hash value
of bucket-key (It must be a subset of the primary keys excluding partition keys
of the primary key table).
+ |If you specify multiple fields, delimiter is ','.
+ |If the table has a primary key and a bucket key is not specified,
the bucket key will be used as primary key(excluding the partition key).
+ |If the table has no primary key and the bucket key is not
specified, the data will be distributed to each bucket randomly.
+ |""".stripMargin)
+
+ val BUCKET_NUMBER: ConfigOption[Integer] =
+ ConfigBuilder
+ .key("bucket.num")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The number of buckets of a Fluss table.")
+
+ val COMMENT: ConfigOption[String] =
+ ConfigBuilder
+ .key("comment")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The comment of a Fluss table.")
+
+ val SPARK_TABLE_OPTIONS: Seq[String] =
+ Seq(PRIMARY_KEY, BUCKET_KEY, BUCKET_NUMBER, COMMENT).map(_.key)
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
new file mode 100644
index 000000000..f85e33f81
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.config.FlussConfigUtils
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.spark.SparkConnectorOptions._
+import org.apache.fluss.spark.types.{FlussToSparkTypeVisitor,
SparkToFlussTypeVisitor}
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.FlussIdentityTransform
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+object SparkConversions {
+
+ def toFlussDataType(schema: StructType): RowType =
+ SparkToFlussTypeVisitor.visit(schema).asInstanceOf[RowType]
+
+ def toSparkDataType(rowType: RowType): StructType =
+ FlussToSparkTypeVisitor.visit(rowType).asInstanceOf[StructType]
+
+ def toFlussTable(
+ sparkSchema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): TableDescriptor = {
+ val caseInsensitiveProps = CaseInsensitiveMap(properties.asScala.toMap)
+
+ val tableDescriptorBuilder = TableDescriptor.builder()
+ val schemaBuilder =
Schema.newBuilder().fromRowType(toFlussDataType(sparkSchema))
+
+ val partitionKey = toPartitionKeys(partitions)
+ tableDescriptorBuilder.partitionedBy(partitionKey: _*)
+
+ val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) {
+ val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",")
+ schemaBuilder.primaryKey(pks: _*)
+ pks
+ } else {
+ Array.empty[String]
+ }
+
+ if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) {
+ val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt
+ val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) {
+ caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",")
+ } else {
+ primaryKeys.filterNot(partitionKey.contains)
+ }
+ tableDescriptorBuilder.distributedBy(bucketNum, bucketKeys: _*)
+ }
+
+ if (caseInsensitiveProps.contains(COMMENT.key)) {
+ tableDescriptorBuilder.comment(caseInsensitiveProps.get(COMMENT.key).get)
+ }
+
+ val (tableProps, customProps) =
+ caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition {
+ case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX)
+ }
+
+ tableDescriptorBuilder
+ .schema(schemaBuilder.build())
+ .properties(tableProps.asJava)
+ .customProperties(customProps.asJava)
+ .build()
+ }
+
+ def toPartitionKeys(partitions: Array[Transform]): Array[String] = {
+ val partitionKeys = mutable.ArrayBuffer.empty[String]
+ partitions.foreach {
+ case FlussIdentityTransform(parts) if parts.length == 1 =>
+ partitionKeys += parts.head
+ case p =>
+ throw new UnsupportedOperationException("Unsupported partition
transform: " + p)
+ }
+ partitionKeys.toArray
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
new file mode 100644
index 000000000..1416ab415
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
+
+case class SparkTable(table: TableInfo)
+ extends AbstractSparkTable(table)
+ with SupportsFlussPartitionManagement {}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
new file mode 100644
index 000000000..8694c7f59
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.spark.SparkConversions
+
+import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {
+
+ protected lazy val _schema: StructType =
+ SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
+
+ protected lazy val _partitionSchema = new StructType(
+ _schema.fields.filter(tableInfo.getPartitionKeys.contains))
+
+ override def name(): String = tableInfo.toString
+
+ override def schema(): StructType = _schema
+
+ override def capabilities(): util.Set[TableCapability] =
Set.empty[TableCapability].asJava
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
new file mode 100644
index 000000000..ddc83a54b
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.exception.DatabaseNotExistException
+import org.apache.fluss.metadata.DatabaseDescriptor
+import org.apache.fluss.utils.Preconditions
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
+import org.apache.spark.sql.connector.catalog.{NamespaceChange,
SupportsNamespaces}
+
+import java.util
+import java.util.concurrent.ExecutionException
+
+import scala.collection.JavaConverters._
+
+trait SupportsFlussNamespaces extends SupportsNamespaces with WithFlussAdmin {
+
+ override def listNamespaces(): Array[Array[String]] = {
+ admin.listDatabases.get.asScala.map(Array(_)).toArray
+ }
+
+ override def listNamespaces(namespace: Array[String]): Array[Array[String]]
= {
+ if (namespace.isEmpty) {
+ return listNamespaces()
+ }
+
+ doNamespaceOperator(namespace) {
+ val dbname = admin.getDatabaseInfo(namespace.head).get().getDatabaseName
+ Array(Array(dbname))
+ }
+ }
+
+ override def loadNamespaceMetadata(namespace: Array[String]):
util.Map[String, String] = {
+ doNamespaceOperator(namespace) {
+
admin.getDatabaseInfo(namespace.head).get().getDatabaseDescriptor.getCustomProperties
+ }
+ }
+
+ override def createNamespace(
+ namespace: Array[String],
+ metadata: util.Map[String, String]): Unit = {
+ doNamespaceOperator(namespace) {
+ val databaseDescriptor = DatabaseDescriptor
+ .builder()
+ .customProperties(metadata)
+ .build();
+ admin.createDatabase(namespace.head, databaseDescriptor, false).get()
+ }
+ }
+
+ override def alterNamespace(
+ namespace: Array[String],
+ namespaceChanges: NamespaceChange*): Unit = {
+ new UnsupportedOperationException("Altering namespace is not supported
now.")
+ }
+
+ override def dropNamespace(namespace: Array[String], cascade: Boolean):
Boolean = {
+ doNamespaceOperator(namespace) {
+ admin.dropDatabase(namespace.head, false, cascade).get()
+ true
+ }
+ }
+
+ protected def doNamespaceOperator[T](namespace: Array[String])(f: => T): T =
{
+ checkNamespace(namespace)
+ try {
+ f
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[DatabaseNotExistException] =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ private def checkNamespace(namespace: Array[String]): Unit = {
+ Preconditions.checkArgument(
+ namespace.length == 1,
+ "Only single namespace is supported in Fluss.")
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
new file mode 100644
index 000000000..ed888e5fe
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+trait SupportsFlussPartitionManagement extends AbstractSparkTable with
SupportsPartitionManagement {
+
+ override def partitionSchema(): StructType = _partitionSchema
+
+ override def createPartition(ident: InternalRow, properties:
util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException("Creating partition is not
supported")
+ }
+
+ override def dropPartition(ident: InternalRow): Boolean = {
+ throw new UnsupportedOperationException("Dropping partition is not
supported")
+ }
+
+ override def replacePartitionMetadata(
+ ident: InternalRow,
+ properties: util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException("Replacing partition metadata is
not supported")
+ }
+
+ override def loadPartitionMetadata(ident: InternalRow): util.Map[String,
String] = {
+ throw new UnsupportedOperationException("Loading partition is not
supported")
+ }
+
+ override def listPartitionIdentifiers(
+ names: Array[String],
+ ident: InternalRow): Array[InternalRow] = {
+ throw new UnsupportedOperationException("Listing partition is not
supported")
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
new file mode 100644
index 000000000..fa62b91a4
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.utils.{IOUtils, Preconditions}
+
+import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+trait WithFlussAdmin extends AutoCloseable {
+
+ private var _connection: Connection = _
+ private var _admin: Admin = _
+
+ // TODO: init lake spark catalog
+ protected var lakeCatalog: CatalogPlugin = _
+
+ protected def initFlussClient(options: CaseInsensitiveStringMap): Unit = {
+ val flussConfigs = new util.HashMap[String, String]()
+ options.entrySet().asScala.foreach {
+ entry: util.Map.Entry[String, String] => flussConfigs.put(entry.getKey,
entry.getValue)
+ }
+
+ _connection =
ConnectionFactory.createConnection(FlussConfiguration.fromMap(flussConfigs))
+ _admin = _connection.getAdmin
+ }
+
+ protected def admin: Admin = {
+ Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.")
+ _admin
+ }
+
+ override def close(): Unit = {
+ IOUtils.closeQuietly(_admin, "fluss-admin")
+ IOUtils.closeQuietly(_connection, "fluss-connection");
+ }
+
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
new file mode 100644
index 000000000..f9df1e9c0
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.types
+
+import org.apache.fluss.types._
+
+import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes =>
SparkDataTypes}
+
+import scala.collection.JavaConverters._
+
+object FlussToSparkTypeVisitor extends DataTypeVisitor[SparkDataType] {
+
+ override def visit(charType: CharType): SparkDataType = {
+ org.apache.spark.sql.types.CharType(charType.getLength)
+ }
+
+ override def visit(stringType: StringType): SparkDataType = {
+ SparkDataTypes.StringType
+ }
+
+ override def visit(booleanType: BooleanType): SparkDataType = {
+ SparkDataTypes.BooleanType
+ }
+
+ override def visit(binaryType: BinaryType): SparkDataType = {
+ SparkDataTypes.BinaryType
+ }
+
+ override def visit(bytesType: BytesType): SparkDataType = {
+ SparkDataTypes.BinaryType
+ }
+
+ override def visit(decimalType: DecimalType): SparkDataType = {
+ org.apache.spark.sql.types.DecimalType(decimalType.getPrecision,
decimalType.getScale)
+ }
+
+ override def visit(tinyIntType: TinyIntType): SparkDataType = {
+ SparkDataTypes.ByteType
+ }
+
+ override def visit(smallIntType: SmallIntType): SparkDataType = {
+ SparkDataTypes.ShortType
+ }
+
+ override def visit(intType: IntType): SparkDataType = {
+ SparkDataTypes.IntegerType
+ }
+
+ override def visit(bigIntType: BigIntType): SparkDataType = {
+ SparkDataTypes.LongType
+ }
+
+ override def visit(floatType: FloatType): SparkDataType = {
+ SparkDataTypes.FloatType
+ }
+
+ override def visit(doubleType: DoubleType): SparkDataType = {
+ SparkDataTypes.DoubleType
+ }
+
+ override def visit(dateType: DateType): SparkDataType = {
+ SparkDataTypes.DateType
+ }
+
+ override def visit(timeType: TimeType): SparkDataType = {
+ SparkDataTypes.IntegerType
+ }
+
+ override def visit(timestampType: TimestampType): SparkDataType = {
+ SparkDataTypes.TimestampNTZType
+ }
+
+ override def visit(localZonedTimestampType: LocalZonedTimestampType):
SparkDataType = {
+ SparkDataTypes.TimestampType
+ }
+
+ override def visit(arrayType: ArrayType): SparkDataType = {
+ SparkDataTypes.createArrayType(
+ arrayType.getElementType.accept(this),
+ arrayType.getElementType.isNullable)
+ }
+
+ override def visit(mapType: MapType): SparkDataType = {
+ SparkDataTypes.createMapType(
+ mapType.getKeyType.accept(this),
+ mapType.getValueType.accept(this),
+ mapType.getValueType.isNullable
+ )
+ }
+
+ override def visit(rowType: RowType): SparkDataType = {
+ val sparkFields = rowType.getFields.asScala.map {
+ flussField =>
+ SparkDataTypes.createStructField(
+ flussField.getName,
+ flussField.getType.accept(this),
+ flussField.getType.isNullable)
+ }
+ SparkDataTypes.createStructType(sparkFields.toArray)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
new file mode 100644
index 000000000..ee7e90633
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.types
+
+import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField =>
FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _}
+
+import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType =>
SparkDataType, MapType => SparkMapType, StructType, UserDefinedType}
+
+import scala.collection.JavaConverters._
+
+object SparkToFlussTypeVisitor {
+
+ def visit(dataType: SparkDataType): FlussDataType = {
+ dataType match {
+ case st: StructType =>
+ visitStructType(st)
+ case mt: SparkMapType =>
+ visitMapType(mt)
+ case at: SparkArrayType =>
+ visitArrayType(at)
+ case _: UserDefinedType[_] =>
+ throw new UnsupportedOperationException("User-defined type is not
supported");
+ case t =>
+ visitPrimitiveType(t)
+ }
+ }
+
+ private def visitStructType(st: StructType): RowType = {
+ val flussDataFields = st.fields.map {
+ field =>
+ val flussDataType = visit(field.dataType)
+ new FlussDataField(field.name, flussDataType,
field.getComment().orNull)
+ }
+ new RowType(flussDataFields.toList.asJava)
+ }
+
+ private def visitMapType(mt: SparkMapType): FlussMapType = {
+ new FlussMapType(visit(mt.keyType),
visit(mt.valueType).copy(mt.valueContainsNull))
+ }
+
+ private def visitArrayType(at: SparkArrayType): FlussArrayType = {
+ new FlussArrayType(visit(at.elementType).copy(at.containsNull))
+ }
+
+ private def visitPrimitiveType(t: SparkDataType): FlussDataType = {
+ t match {
+ case _: org.apache.spark.sql.types.BooleanType =>
+ new BooleanType()
+ case _: org.apache.spark.sql.types.ByteType =>
+ new TinyIntType()
+ case _: org.apache.spark.sql.types.ShortType =>
+ new SmallIntType()
+ case _: org.apache.spark.sql.types.IntegerType =>
+ new IntType()
+ case _: org.apache.spark.sql.types.LongType =>
+ new BigIntType()
+ case _: org.apache.spark.sql.types.FloatType =>
+ new FloatType()
+ case _: org.apache.spark.sql.types.DoubleType =>
+ new DoubleType()
+ case dt: org.apache.spark.sql.types.DecimalType =>
+ new DecimalType(dt.precision, dt.scale)
+ case x: org.apache.spark.sql.types.BinaryType =>
+ new BytesType()
+ case _: org.apache.spark.sql.types.VarcharType =>
+ new StringType()
+ case ct: org.apache.spark.sql.types.CharType =>
+ new CharType(ct.length)
+ case _: org.apache.spark.sql.types.StringType =>
+ new StringType()
+ case _: org.apache.spark.sql.types.DateType =>
+ new DateType()
+ case _: org.apache.spark.sql.types.TimestampType =>
+ // spark only support 6 digits of precision
+ new LocalZonedTimestampType(6)
+ case _: org.apache.spark.sql.types.TimestampNTZType =>
+ // spark only support 6 digits of precision
+ new TimestampType(6)
+ case _ =>
+ throw new UnsupportedOperationException(s"Data
type(${t.catalogString}) is not supported");
+ }
+ }
+
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
new file mode 100644
index 000000000..9d9b4a5c3
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.connector.expressions.{FieldReference,
IdentityTransform, Transform}
+
+object FlussIdentityTransform {
+ def unapply(transform: Transform): Option[Seq[String]] = {
+ transform match {
+ case IdentityTransform(FieldReference(parts)) =>
+ Some(parts)
+ case _ =>
+ None
+ }
+ }
+}
diff --git a/fluss-spark/fluss-spark-ut/pom.xml
b/fluss-spark/fluss-spark-ut/pom.xml
new file mode 100644
index 000000000..7592c6ffd
--- /dev/null
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+ <name>Fluss : Engine Spark : UT</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-test-utils</artifactId>
+ </dependency>
+
+ <!-- for curator TestingServer -->
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
new file mode 100644
index 000000000..a9022fff4
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor,
TablePath}
+import org.apache.fluss.types.{DataTypes, RowType}
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.{assertThat, assertThatList}
+
+import scala.collection.JavaConverters._
+
+class FlussCatalogTest extends FlussSparkTestBase {
+
+ test("Catalog: namespaces") {
+ // Always a default database 'fluss'.
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+
+ sql("CREATE DATABASE testdb COMMENT 'created by spark'")
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("testdb")
:: Nil)
+
+ checkAnswer(
+ sql("DESC DATABASE testdb").filter("info_name != 'Owner'"),
+ Row("Catalog Name", "fluss_catalog") :: Row("Namespace Name", "testdb")
:: Row(
+ "Comment",
+ "created by spark") :: Nil
+ )
+
+ sql("DROP DATABASE testdb")
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+ }
+
+ test("Catalog: basic table") {
+ sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string)
COMMENT 'my test table'")
+ checkAnswer(sql("SHOW TABLES"), Row(DEFAULT_DATABASE, "test_tbl", false)
:: Nil)
+ checkAnswer(sql("DESC test_tbl"), Row("id", "int", null) :: Row("name",
"string", null) :: Nil)
+
+ val testTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
"test_tbl")).get()
+ assertThat(testTable.getTablePath.getTableName).isEqualTo("test_tbl")
+ assertThat(testTable.getComment.orElse(null)).isEqualTo("my test table")
+ assertThat(testTable.getRowType).isEqualTo(
+ RowType.builder().field("id", DataTypes.INT()).field("name",
DataTypes.STRING()).build())
+
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.test_pt_tbl (id int, name string,
pt string)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES("key" = "value")
+ |""".stripMargin)
+
+ val testPartitionedTable =
+ admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_pt_tbl")).get()
+ assertThat(testPartitionedTable.getRowType).isEqualTo(
+ RowType
+ .builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("pt", DataTypes.STRING())
+ .build())
+ assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt")
+
assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true)
+ assertThat(
+
testPartitionedTable.getCustomProperties.getRawValue("key").get().asInstanceOf[String])
+ .isEqualTo("value")
+
+ sql("DROP TABLE test_tbl")
+ sql("DROP TABLE test_pt_tbl")
+ checkAnswer(sql("SHOW TABLES"), Nil)
+ }
+
+ test("Catalog: primary-key table") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt
string)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES("primary.key" = "id,pt")
+ |""".stripMargin)
+
+ val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
"test_tbl")).get()
+ assertThatList(tbl1.getPrimaryKeys).hasSameElementsAs(Seq("id",
"pt").toList.asJava)
+ assertThat(tbl1.getNumBuckets).isEqualTo(1)
+ assertThat(tbl1.getBucketKeys.contains("id")).isEqualTo(true)
+ assertThat(tbl1.getPartitionKeys.contains("pt")).isEqualTo(true)
+
+ sql(
+ s"""
+ |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name
string, pt1 string, pt2 string)
+ |PARTITIONED BY (pt1, pt2)
+ |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3,
"bucket.key" = "pk1")
+ |""".stripMargin)
+
+ val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE,
"test_tbl2")).get()
+ assertThatList(tbl2.getPrimaryKeys).hasSameElementsAs(
+ Seq("pk1", "pk2", "pt1", "pt2").toList.asJava)
+ assertThat(tbl2.getNumBuckets).isEqualTo(3)
+
assertThatList(tbl2.getBucketKeys).hasSameElementsAs(Seq("pk1").toList.asJava)
+ }
+
+ test("Catalog: check namespace and table created by admin") {
+ val dbDesc = DatabaseDescriptor.builder().comment("created by
admin").build()
+ admin.createDatabase("db_by_admin", dbDesc, true).get()
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) ::
Row("db_by_admin") :: Nil)
+
+ sql("USE db_by_admin")
+ val tablePath = TablePath.of("db_by_admin", "tbl_by_admin")
+ val rt = RowType
+ .builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("pt", DataTypes.STRING())
+ .build()
+ val tableDesc = TableDescriptor
+ .builder()
+ .schema(Schema.newBuilder().fromRowType(rt).build())
+ .partitionedBy("pt")
+ .build()
+ admin.createTable(tablePath, tableDesc, false).get()
+ checkAnswer(sql("SHOW TABLES"), Row("db_by_admin", "tbl_by_admin", false)
:: Nil)
+ checkAnswer(
+ sql("DESC tbl_by_admin"),
+ Row("id", "int", null) :: Row("name", "string", null) :: Row("pt",
"string", null) :: Nil)
+
+ admin.dropTable(tablePath, true).get()
+ checkAnswer(sql("SHOW TABLES"), Nil)
+
+ admin.dropDatabase("db_by_admin", true, true).get()
+ checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
new file mode 100644
index 000000000..08b6d4600
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.server.testutils.FlussClusterExtension
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSparkSession
+import org.junit.jupiter.api.extension.RegisterExtension
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import java.time.Duration
+
+class FlussSparkTestBase extends QueryTest with SharedSparkSession {
+
+ import FlussSparkTestBase._
+
+ protected val DEFAULT_DATABASE = "fluss";
+
+ protected var conn: Connection = _
+ protected var admin: Admin = _
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.sql.catalog.fluss_catalog", classOf[SparkCatalog].getName)
+ .set("spark.sql.catalog.fluss_catalog.bootstrap.servers",
bootstrapServers)
+ .set("spark.sql.defaultCatalog", "fluss_catalog")
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ conn = ConnectionFactory.createConnection(clientConf)
+ admin = conn.getAdmin
+
+ sql(s"USE $DEFAULT_DATABASE")
+ }
+
+ override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+ pos: Position): Unit = {
+ println(testName)
+ super.test(testName, testTags: _*)(testFun)(pos)
+ }
+
+}
+
+@RegisterExtension
+object FlussSparkTestBase {
+ val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension =
+ FlussClusterExtension.builder
+ .setClusterConf(
+ new Configuration()
+ .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ )
+ .setNumOfTabletServers(3)
+ .build
+
+ FLUSS_CLUSTER_EXTENSION.start()
+
+ val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig
+ val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers
+}
diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml
new file mode 100644
index 000000000..777352809
--- /dev/null
+++ b/fluss-spark/pom.xml
@@ -0,0 +1,337 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-spark</artifactId>
+ <name>Fluss : Engine Spark :</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>fluss-spark-common</module>
+ <module>fluss-spark-ut</module>
+ <module>fluss-spark-3.5</module>
+ <module>fluss-spark-3.4</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <!-- SPARK-40511 upgrades SLF4J2, which is not compatible
w/ SLF4J1 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>1.0.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+ <inputEncoding>UTF-8</inputEncoding>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>${target.java.version}</source>
+ <target>${target.java.version}</target>
+
<useIncrementalCompilation>false</useIncrementalCompilation>
+ <compilerArgs>
+ <arg>-Xpkginfo:always</arg>
+ <arg>-Xlint:deprecation</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources
phase, so that dependencies on
+ scala classes can be resolved later in the (Java)
compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources
phase, so that dependencies on
+ scala classes can be resolved later in the (Java)
test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
+ <args>
+ <arg>-nobootcp</arg>
+ <arg>-target:jvm-${target.java.version}</arg>
+ </args>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>2.1.0</version>
+ <configuration>
+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g
-XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs}
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+ <filereports>FlussTestSuite.txt</filereports>
+ <forkMode>once</forkMode>
+ <noScalaTestIgnore>true</noScalaTestIgnore>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 7f0a4cc9b..16e942860 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -126,6 +126,7 @@
<exclude>fluss-test-coverage/**</exclude>
<exclude>fluss-test-utils/**</exclude>
<exclude>fluss-flink/**</exclude>
+
<exclude>fluss-spark/**</exclude>
<exclude>fluss-lake/**</exclude>
</excludes>
</resource>
@@ -179,6 +180,44 @@
</build>
</profile>
+ <profile>
+ <id>test-spark3</id>
+ <build>
+ <plugins>
+ <!-- required by jacoco for the goal: check to work -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-class-files</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <overwrite>false</overwrite>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/../</directory>
+ <includes>
+
<include>fluss-spark/**/target/classes/**</include>
+ </includes>
+ <excludes>
+
<exclude>fluss-test-coverage/**</exclude>
+
<exclude>fluss-test-utils/**</exclude>
+ </excludes>
+ </resource>
+ </resources>
+
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
<profile>
<id>test-lake</id>
<build>
@@ -288,6 +327,8 @@
</limit>
</limits>
<excludes>
+
<exclude>org.apache.fluss.spark.*</exclude>
+
<exclude>org.apache.spark.sql.*</exclude>
<exclude>org.apache.fluss.protogen.*</exclude>
<exclude>org.apache.fluss.memory.*</exclude>
<exclude>org.apache.fluss.utils.*</exclude>
diff --git a/pom.xml b/pom.xml
index 247040141..99558b790 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
<module>fluss-test-coverage</module>
<module>fluss-server</module>
<module>fluss-flink</module>
+ <module>fluss-spark</module>
<module>fluss-protogen</module>
<module>fluss-jmh</module>
<module>fluss-lake</module>
@@ -91,6 +92,13 @@
<paimon.version>1.3.1</paimon.version>
<iceberg.version>1.10.0</iceberg.version>
+ <!-- spark & scala -->
+ <scala212.version>2.12.18</scala212.version>
+ <scala213.version>2.13.16</scala213.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>${scala212.version}</scala.version>
+ <spark.version>3.5.7</spark.version>
+
<fluss.hadoop.version>3.4.0</fluss.hadoop.version>
<frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>
<slf4j.version>1.7.36</slf4j.version>
@@ -1007,6 +1015,17 @@
<removeUnusedImports/>
</java>
+ <scala>
+ <scalafmt>
+ <version>3.10.2</version>
+
<file>${maven.multiModuleProjectDirectory}/.scalafmt.conf</file>
+ </scalafmt>
+
+ <licenseHeader>
+
<file>${maven.multiModuleProjectDirectory}/copyright.txt</file>
+ <delimiter>${spotless.delimiter}</delimiter>
+ </licenseHeader>
+ </scala>
</configuration>
<executions>
<execution>