This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ed3b37b9 [spark] Initiate fluss-spark and introduce spark catalog and 
table (#2219)
5ed3b37b9 is described below

commit 5ed3b37b95f9a596e3fc8b4dbc41e1e76518dc6f
Author: Yann Byron <[email protected]>
AuthorDate: Fri Dec 26 20:30:27 2025 +0800

    [spark] Initiate fluss-spark and introduce spark catalog and table (#2219)
---
 .github/workflows/ci-template.yaml                 |   2 +-
 .github/workflows/stage.sh                         |  22 +-
 .scalafmt.conf                                     |  70 +++++
 copyright.txt                                      |  17 ++
 .../org/apache/fluss/config/Configuration.java     |   4 +-
 fluss-spark/fluss-spark-3.4/pom.xml                | 129 ++++++++
 fluss-spark/fluss-spark-3.5/pom.xml                | 129 ++++++++
 fluss-spark/fluss-spark-common/pom.xml             |  71 +++++
 .../org/apache/fluss/spark/SparkCatalog.scala      | 109 +++++++
 .../apache/fluss/spark/SparkConnectorOptions.scala |  61 ++++
 .../org/apache/fluss/spark/SparkConversions.scala  | 100 ++++++
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  25 ++
 .../fluss/spark/catalog/AbstractSparkTable.scala   |  43 +++
 .../spark/catalog/SupportsFlussNamespaces.scala    |  95 ++++++
 .../catalog/SupportsFlussPartitionManagement.scala |  53 ++++
 .../fluss/spark/catalog/WithFlussAdmin.scala       |  60 ++++
 .../spark/types/FlussToSparkTypeVisitor.scala      | 116 +++++++
 .../spark/types/SparkToFlussTypeVisitor.scala      |  99 ++++++
 .../apache/spark/sql/FlussIdentityTransform.scala  |  31 ++
 fluss-spark/fluss-spark-ut/pom.xml                 | 125 ++++++++
 .../org/apache/fluss/spark/FlussCatalogTest.scala  | 142 +++++++++
 .../apache/fluss/spark/FlussSparkTestBase.scala    |  81 +++++
 fluss-spark/pom.xml                                | 337 +++++++++++++++++++++
 fluss-test-coverage/pom.xml                        |  41 +++
 pom.xml                                            |  19 ++
 25 files changed, 1978 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/ci-template.yaml 
b/.github/workflows/ci-template.yaml
index c5f77af3a..7179b0c68 100644
--- a/.github/workflows/ci-template.yaml
+++ b/.github/workflows/ci-template.yaml
@@ -36,7 +36,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        module: [ core, flink, lake ]
+        module: [ core, flink, spark3, lake ]
     name: "${{ matrix.module }}"
     steps:
       - name: Checkout code
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index 18587dc29..b35b74e47 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -19,6 +19,7 @@
 
 STAGE_CORE="core"
 STAGE_FLINK="flink"
+STAGE_SPARK="spark3"
 STAGE_LAKE="lake"
 
 MODULES_FLINK="\
@@ -28,6 +29,20 @@ fluss-flink/fluss-flink-2.2,\
 fluss-flink/fluss-flink-1.20,\
 "
 
+MODULES_COMMON_SPARK="\
+fluss-spark,\
+fluss-spark/fluss-spark-common,\
+fluss-spark/fluss-spark-ut,\
+"
+
+MODULES_SPARK3="\
+fluss-spark,\
+fluss-spark/fluss-spark-common,\
+fluss-spark/fluss-spark-ut,\
+fluss-spark/fluss-spark-3.5,\
+fluss-spark/fluss-spark-3.4,\
+"
+
 # we move Flink legacy version tests to "lake" module for balancing testing 
time
 MODULES_LAKE="\
 fluss-flink/fluss-flink-1.19,\
@@ -42,10 +57,12 @@ function get_test_modules_for_stage() {
     local stage=$1
 
     local modules_flink=$MODULES_FLINK
+    local modules_spark3=$MODULES_SPARK3
     local modules_lake=$MODULES_LAKE
     local negated_flink=\!${MODULES_FLINK//,/,\!}
+    local negated_spark=\!${MODULES_COMMON_SPARK//,/,\!}
     local negated_lake=\!${MODULES_LAKE//,/,\!}
-    local modules_core="$negated_flink,$negated_lake"
+    local modules_core="$negated_flink,$negated_spark,$negated_lake"
 
     case ${stage} in
         (${STAGE_CORE})
@@ -54,6 +71,9 @@ function get_test_modules_for_stage() {
         (${STAGE_FLINK})
             echo "-pl fluss-test-coverage,$modules_flink"
         ;;
+        (${STAGE_SPARK})
+            echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
+        ;;
         (${STAGE_LAKE})
             echo "-pl fluss-test-coverage,$modules_lake"
         ;;
diff --git a/.scalafmt.conf b/.scalafmt.conf
new file mode 100644
index 000000000..7a7c55f0f
--- /dev/null
+++ b/.scalafmt.conf
@@ -0,0 +1,70 @@
+runner.dialect = scala212
+
+# Version is required to make sure IntelliJ picks the right version
+version = 3.10.2
+preset = default
+
+# Max column
+maxColumn = 100
+
+# This parameter simply says the .stripMargin method was not redefined by the 
user to assign
+# special meaning to indentation preceding the | character. Hence, that 
indentation can be modified.
+assumeStandardLibraryStripMargin = true
+align.stripMargin = true
+
+# Align settings
+align.preset = none
+align.closeParenSite = false
+align.openParenCallSite = false
+danglingParentheses.defnSite = false
+danglingParentheses.callSite = false
+danglingParentheses.ctrlSite = true
+danglingParentheses.tupleSite = false
+align.openParenCallSite = false
+align.openParenDefnSite = false
+align.openParenTupleSite = false
+
+# Newlines
+newlines.alwaysBeforeElseAfterCurlyIf = false
+newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params
+newlines.afterCurlyLambdaParams = squash # No newline after lambda params
+newlines.inInterpolation = "avoid"
+newlines.avoidInResultType = true
+optIn.annotationNewlines = true
+
+# Scaladoc
+docstrings.style = Asterisk # Javadoc style
+docstrings.removeEmpty = true
+docstrings.oneline = fold
+docstrings.forceBlankLineBefore = true
+
+# Indentation
+indent.extendSite = 2 # This makes sure extend is not indented as the ctor 
parameters
+
+# Rewrites
+rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers]
+
+# Imports
+rewrite.imports.sort = scalastyle
+rewrite.imports.groups = [
+    ["org.apache.fluss\\..*"],
+    ["org.apache.fluss.shade\\..*"],
+    [".*"],
+    ["javax\\..*"],
+    ["java\\..*"],
+    ["scala\\..*"]
+]
+rewrite.imports.contiguousGroups = no
+importSelectors = singleline # Imports in a single line, like IntelliJ
+
+# Remove redundant braces in string interpolation.
+rewrite.redundantBraces.stringInterpolation = true
+rewrite.redundantBraces.defnBodies = false
+rewrite.redundantBraces.generalExpressions = false
+rewrite.redundantBraces.ifElseExpressions = false
+rewrite.redundantBraces.methodBodies = false
+rewrite.redundantBraces.includeUnitMethods = false
+rewrite.redundantBraces.maxBreaks = 1
+
+# Remove trailing commas
+rewrite.trailingCommas.style = "never"
diff --git a/copyright.txt b/copyright.txt
new file mode 100644
index 000000000..29400e587
--- /dev/null
+++ b/copyright.txt
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java 
b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
index a7ee42f75..2c985f5a1 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.config;
 
 import org.apache.fluss.annotation.PublicStable;
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.utils.CollectionUtils;
 
 import org.slf4j.Logger;
@@ -643,7 +644,8 @@ public class Configuration implements Serializable, 
ReadableConfig {
         }
     }
 
-    private Optional<Object> getRawValue(String key) {
+    @VisibleForTesting
+    public Optional<Object> getRawValue(String key) {
         if (key == null) {
             throw new NullPointerException("Key must not be null.");
         }
diff --git a/fluss-spark/fluss-spark-3.4/pom.xml 
b/fluss-spark/fluss-spark-3.4/pom.xml
new file mode 100644
index 000000000..a5d10f39a
--- /dev/null
+++ b/fluss-spark/fluss-spark-3.4/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.fluss</groupId>
+        <artifactId>fluss-spark</artifactId>
+        <version>0.9-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>fluss-spark-3.4_${scala.binary.version}</artifactId>
+    <name>Fluss : Engine Spark : 3.4</name>
+
+    <properties>
+        <spark.version>3.4.3</spark.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <!-- compilation of main sources -->
+                    <skipMain>${skip.on.java8}</skipMain>
+                    <!-- compilation of test sources -->
+                    <skip>${skip.on.java8}</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>3.0.0-M5</version>
+                <executions>
+                    <!-- Test end with ITCase is e2e test in this module   -->
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <inherited>false</inherited>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skip>${skip.on.java8}</skip>
+                            <includes>
+                                <include>**/*ITCase.*</include>
+                            </includes>
+                            <!-- e2e test with flink/zookeeper cluster, we set 
forkCount=1 -->
+                            <forkCount>1</forkCount>
+                        </configuration>
+                    </execution>
+                    <!-- others unit tests -->
+                    <execution>
+                        <id>default-test</id>
+                        <phase>test</phase>
+                        <inherited>false</inherited>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skip>${skip.on.java8}</skip>
+                            <excludes>
+                                <exclude>**/*ITCase.*</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-fluss</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.fluss:fluss-spark-common</include>
+                                    
<include>org.apache.fluss:fluss-client</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/fluss-spark/fluss-spark-3.5/pom.xml 
b/fluss-spark/fluss-spark-3.5/pom.xml
new file mode 100644
index 000000000..1f55f0e12
--- /dev/null
+++ b/fluss-spark/fluss-spark-3.5/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.fluss</groupId>
+        <artifactId>fluss-spark</artifactId>
+        <version>0.9-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>fluss-spark-3.5_${scala.binary.version}</artifactId>
+    <name>Fluss : Engine Spark : 3.5</name>
+
+    <properties>
+        <spark.version>3.5.7</spark.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <!-- compilation of main sources -->
+                    <skipMain>${skip.on.java8}</skipMain>
+                    <!-- compilation of test sources -->
+                    <skip>${skip.on.java8}</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>3.0.0-M5</version>
+                <executions>
+                    <!-- Test end with ITCase is e2e test in this module   -->
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <inherited>false</inherited>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skip>${skip.on.java8}</skip>
+                            <includes>
+                                <include>**/*ITCase.*</include>
+                            </includes>
+                            <!-- e2e test with flink/zookeeper cluster, we set 
forkCount=1 -->
+                            <forkCount>1</forkCount>
+                        </configuration>
+                    </execution>
+                    <!-- others unit tests -->
+                    <execution>
+                        <id>default-test</id>
+                        <phase>test</phase>
+                        <inherited>false</inherited>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <skip>${skip.on.java8}</skip>
+                            <excludes>
+                                <exclude>**/*ITCase.*</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-fluss</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.fluss:fluss-spark-common</include>
+                                    
<include>org.apache.fluss:fluss-client</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/fluss-spark/fluss-spark-common/pom.xml 
b/fluss-spark/fluss-spark-common/pom.xml
new file mode 100644
index 000000000..e285b8bbc
--- /dev/null
+++ b/fluss-spark/fluss-spark-common/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.fluss</groupId>
+        <artifactId>fluss-spark</artifactId>
+        <version>0.9-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+    <name>Fluss : Engine Spark : Common</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-fluss</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.fluss:fluss-spark-common</include>
+                                    
<include>org.apache.fluss:fluss-client</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
new file mode 100644
index 000000000..b620d2700
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.exception.{DatabaseNotExistException, 
TableAlreadyExistException, TableNotExistException}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin}
+
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, 
TableCatalog, TableChange}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import java.util.concurrent.ExecutionException
+
+import scala.collection.JavaConverters._
+
+class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with 
WithFlussAdmin {
+
+  private var catalogName: String = "fluss"
+
+  override def listTables(namespace: Array[String]): Array[Identifier] = {
+    doNamespaceOperator(namespace) {
+      admin
+        .listTables(namespace.head)
+        .get()
+        .asScala
+        .map(table => Identifier.of(namespace, table))
+        .toArray
+    }
+  }
+
+  override def loadTable(ident: Identifier): Table = {
+    try {
+      SparkTable(admin.getTableInfo(toTablePath(ident)).get())
+    } catch {
+      case e: ExecutionException if 
e.getCause.isInstanceOf[TableNotExistException] =>
+        throw new NoSuchTableException(ident)
+    }
+  }
+
+  override def createTable(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    try {
+      val tableDescriptor = SparkConversions.toFlussTable(schema, partitions, 
properties)
+      admin.createTable(toTablePath(ident), tableDescriptor, false).get()
+      loadTable(ident)
+    } catch {
+      case e: ExecutionException =>
+        if (e.getCause.isInstanceOf[DatabaseNotExistException]) {
+          throw new NoSuchNamespaceException(ident.namespace())
+        } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) {
+          throw new TableAlreadyExistsException(ident)
+        } else {
+          throw new RuntimeException(e)
+        }
+    }
+  }
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+    throw new UnsupportedOperationException("Altering table is not supported")
+  }
+
+  override def dropTable(ident: Identifier): Boolean = {
+    try {
+      admin.dropTable(toTablePath(ident), false).get()
+      true
+    } catch {
+      case e: ExecutionException if 
e.getCause.isInstanceOf[TableNotExistException] =>
+        throw new NoSuchTableException(ident)
+    }
+  }
+
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
+    throw new UnsupportedOperationException("Renaming table is not supported")
+  }
+
+  override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {
+    catalogName = name;
+    initFlussClient(options)
+  }
+
+  override def name(): String = catalogName
+
+  private def toTablePath(ident: Identifier): TablePath = {
+    assert(ident.namespace().length == 1, "Only single namespace is supported")
+    TablePath.of(ident.namespace().head, ident.name)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
new file mode 100644
index 000000000..71b168985
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
+
+object SparkConnectorOptions {
+
+  val PRIMARY_KEY: ConfigOption[String] =
+    ConfigBuilder
+      .key("primary.key")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The primary keys of a Fluss table.")
+
+  val BUCKET_KEY: ConfigOption[String] =
+    ConfigBuilder
+      .key("bucket.key")
+      .stringType()
+      .noDefaultValue()
+      .withDescription(
+        """
+          |Specific the distribution policy of the Fluss table.
+          |Data will be distributed to each bucket according to the hash value 
of bucket-key (It must be a subset of the primary keys excluding partition keys 
of the primary key table).
+          |If you specify multiple fields, delimiter is ','.
+          |If the table has a primary key and a bucket key is not specified, 
the bucket key will be used as primary key(excluding the partition key).
+          |If the table has no primary key and the bucket key is not 
specified, the data will be distributed to each bucket randomly.
+          |""".stripMargin)
+
+  val BUCKET_NUMBER: ConfigOption[Integer] =
+    ConfigBuilder
+      .key("bucket.num")
+      .intType()
+      .noDefaultValue()
+      .withDescription("The number of buckets of a Fluss table.")
+
+  val COMMENT: ConfigOption[String] =
+    ConfigBuilder
+      .key("comment")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The comment of a Fluss table.")
+
+  val SPARK_TABLE_OPTIONS: Seq[String] =
+    Seq(PRIMARY_KEY, BUCKET_KEY, BUCKET_NUMBER, COMMENT).map(_.key)
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
new file mode 100644
index 000000000..f85e33f81
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.config.FlussConfigUtils
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.spark.SparkConnectorOptions._
+import org.apache.fluss.spark.types.{FlussToSparkTypeVisitor, 
SparkToFlussTypeVisitor}
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.FlussIdentityTransform
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+object SparkConversions {
+
+  def toFlussDataType(schema: StructType): RowType =
+    SparkToFlussTypeVisitor.visit(schema).asInstanceOf[RowType]
+
+  def toSparkDataType(rowType: RowType): StructType =
+    FlussToSparkTypeVisitor.visit(rowType).asInstanceOf[StructType]
+
+  def toFlussTable(
+      sparkSchema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): TableDescriptor = {
+    val caseInsensitiveProps = CaseInsensitiveMap(properties.asScala.toMap)
+
+    val tableDescriptorBuilder = TableDescriptor.builder()
+    val schemaBuilder = 
Schema.newBuilder().fromRowType(toFlussDataType(sparkSchema))
+
+    val partitionKey = toPartitionKeys(partitions)
+    tableDescriptorBuilder.partitionedBy(partitionKey: _*)
+
+    val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) {
+      val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",")
+      schemaBuilder.primaryKey(pks: _*)
+      pks
+    } else {
+      Array.empty[String]
+    }
+
+    if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) {
+      val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt
+      val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) {
+        caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",")
+      } else {
+        primaryKeys.filterNot(partitionKey.contains)
+      }
+      tableDescriptorBuilder.distributedBy(bucketNum, bucketKeys: _*)
+    }
+
+    if (caseInsensitiveProps.contains(COMMENT.key)) {
+      tableDescriptorBuilder.comment(caseInsensitiveProps.get(COMMENT.key).get)
+    }
+
+    val (tableProps, customProps) =
+      caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition {
+        case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX)
+      }
+
+    tableDescriptorBuilder
+      .schema(schemaBuilder.build())
+      .properties(tableProps.asJava)
+      .customProperties(customProps.asJava)
+      .build()
+  }
+
+  def toPartitionKeys(partitions: Array[Transform]): Array[String] = {
+    val partitionKeys = mutable.ArrayBuffer.empty[String]
+    partitions.foreach {
+      case FlussIdentityTransform(parts) if parts.length == 1 =>
+        partitionKeys += parts.head
+      case p =>
+        throw new UnsupportedOperationException("Unsupported partition 
transform: " + p)
+    }
+    partitionKeys.toArray
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
new file mode 100644
index 000000000..1416ab415
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
+
+case class SparkTable(table: TableInfo)
+  extends AbstractSparkTable(table)
+  with SupportsFlussPartitionManagement {}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
new file mode 100644
index 000000000..8694c7f59
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.spark.SparkConversions
+
+import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {
+
+  protected lazy val _schema: StructType =
+    SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
+
+  protected lazy val _partitionSchema = new StructType(
+    _schema.fields.filter(tableInfo.getPartitionKeys.contains))
+
+  override def name(): String = tableInfo.toString
+
+  override def schema(): StructType = _schema
+
+  override def capabilities(): util.Set[TableCapability] = 
Set.empty[TableCapability].asJava
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
new file mode 100644
index 000000000..ddc83a54b
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.exception.DatabaseNotExistException
+import org.apache.fluss.metadata.DatabaseDescriptor
+import org.apache.fluss.utils.Preconditions
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
+import org.apache.spark.sql.connector.catalog.{NamespaceChange, 
SupportsNamespaces}
+
+import java.util
+import java.util.concurrent.ExecutionException
+
+import scala.collection.JavaConverters._
+
+trait SupportsFlussNamespaces extends SupportsNamespaces with WithFlussAdmin {
+
+  override def listNamespaces(): Array[Array[String]] = {
+    admin.listDatabases.get.asScala.map(Array(_)).toArray
+  }
+
+  override def listNamespaces(namespace: Array[String]): Array[Array[String]] 
= {
+    if (namespace.isEmpty) {
+      return listNamespaces()
+    }
+
+    doNamespaceOperator(namespace) {
+      val dbname = admin.getDatabaseInfo(namespace.head).get().getDatabaseName
+      Array(Array(dbname))
+    }
+  }
+
+  override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] = {
+    doNamespaceOperator(namespace) {
+      
admin.getDatabaseInfo(namespace.head).get().getDatabaseDescriptor.getCustomProperties
+    }
+  }
+
+  override def createNamespace(
+      namespace: Array[String],
+      metadata: util.Map[String, String]): Unit = {
+    doNamespaceOperator(namespace) {
+      val databaseDescriptor = DatabaseDescriptor
+        .builder()
+        .customProperties(metadata)
+        .build();
+      admin.createDatabase(namespace.head, databaseDescriptor, false).get()
+    }
+  }
+
+  override def alterNamespace(
+      namespace: Array[String],
+      namespaceChanges: NamespaceChange*): Unit = {
+    new UnsupportedOperationException("Altering namespace is not supported 
now.")
+  }
+
+  override def dropNamespace(namespace: Array[String], cascade: Boolean): 
Boolean = {
+    doNamespaceOperator(namespace) {
+      admin.dropDatabase(namespace.head, false, cascade).get()
+      true
+    }
+  }
+
+  protected def doNamespaceOperator[T](namespace: Array[String])(f: => T): T = 
{
+    checkNamespace(namespace)
+    try {
+      f
+    } catch {
+      case e: ExecutionException if 
e.getCause.isInstanceOf[DatabaseNotExistException] =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  private def checkNamespace(namespace: Array[String]): Unit = {
+    Preconditions.checkArgument(
+      namespace.length == 1,
+      "Only single namespace is supported in Fluss.")
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
new file mode 100644
index 000000000..ed888e5fe
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.types.StructType
+
+import java.util
+
+trait SupportsFlussPartitionManagement extends AbstractSparkTable with 
SupportsPartitionManagement {
+
+  override def partitionSchema(): StructType = _partitionSchema
+
+  override def createPartition(ident: InternalRow, properties: 
util.Map[String, String]): Unit = {
+    throw new UnsupportedOperationException("Creating partition is not 
supported")
+  }
+
+  override def dropPartition(ident: InternalRow): Boolean = {
+    throw new UnsupportedOperationException("Dropping partition is not 
supported")
+  }
+
+  override def replacePartitionMetadata(
+      ident: InternalRow,
+      properties: util.Map[String, String]): Unit = {
+    throw new UnsupportedOperationException("Replacing partition metadata is 
not supported")
+  }
+
+  override def loadPartitionMetadata(ident: InternalRow): util.Map[String, 
String] = {
+    throw new UnsupportedOperationException("Loading partition is not 
supported")
+  }
+
+  override def listPartitionIdentifiers(
+      names: Array[String],
+      ident: InternalRow): Array[InternalRow] = {
+    throw new UnsupportedOperationException("Listing partition is not 
supported")
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
new file mode 100644
index 000000000..fa62b91a4
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.utils.{IOUtils, Preconditions}
+
+import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+trait WithFlussAdmin extends AutoCloseable {
+
+  private var _connection: Connection = _
+  private var _admin: Admin = _
+
+  // TODO: init lake spark catalog
+  protected var lakeCatalog: CatalogPlugin = _
+
+  protected def initFlussClient(options: CaseInsensitiveStringMap): Unit = {
+    val flussConfigs = new util.HashMap[String, String]()
+    options.entrySet().asScala.foreach {
+      entry: util.Map.Entry[String, String] => flussConfigs.put(entry.getKey, 
entry.getValue)
+    }
+
+    _connection = 
ConnectionFactory.createConnection(FlussConfiguration.fromMap(flussConfigs))
+    _admin = _connection.getAdmin
+  }
+
+  protected def admin: Admin = {
+    Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.")
+    _admin
+  }
+
+  override def close(): Unit = {
+    IOUtils.closeQuietly(_admin, "fluss-admin")
+    IOUtils.closeQuietly(_connection, "fluss-connection");
+  }
+
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
new file mode 100644
index 000000000..f9df1e9c0
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.types
+
+import org.apache.fluss.types._
+
+import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes => 
SparkDataTypes}
+
+import scala.collection.JavaConverters._
+
+object FlussToSparkTypeVisitor extends DataTypeVisitor[SparkDataType] {
+
+  override def visit(charType: CharType): SparkDataType = {
+    org.apache.spark.sql.types.CharType(charType.getLength)
+  }
+
+  override def visit(stringType: StringType): SparkDataType = {
+    SparkDataTypes.StringType
+  }
+
+  override def visit(booleanType: BooleanType): SparkDataType = {
+    SparkDataTypes.BooleanType
+  }
+
+  override def visit(binaryType: BinaryType): SparkDataType = {
+    SparkDataTypes.BinaryType
+  }
+
+  override def visit(bytesType: BytesType): SparkDataType = {
+    SparkDataTypes.BinaryType
+  }
+
+  override def visit(decimalType: DecimalType): SparkDataType = {
+    org.apache.spark.sql.types.DecimalType(decimalType.getPrecision, 
decimalType.getScale)
+  }
+
+  override def visit(tinyIntType: TinyIntType): SparkDataType = {
+    SparkDataTypes.ByteType
+  }
+
+  override def visit(smallIntType: SmallIntType): SparkDataType = {
+    SparkDataTypes.ShortType
+  }
+
+  override def visit(intType: IntType): SparkDataType = {
+    SparkDataTypes.IntegerType
+  }
+
+  override def visit(bigIntType: BigIntType): SparkDataType = {
+    SparkDataTypes.LongType
+  }
+
+  override def visit(floatType: FloatType): SparkDataType = {
+    SparkDataTypes.FloatType
+  }
+
+  override def visit(doubleType: DoubleType): SparkDataType = {
+    SparkDataTypes.DoubleType
+  }
+
+  override def visit(dateType: DateType): SparkDataType = {
+    SparkDataTypes.DateType
+  }
+
+  override def visit(timeType: TimeType): SparkDataType = {
+    SparkDataTypes.IntegerType
+  }
+
+  override def visit(timestampType: TimestampType): SparkDataType = {
+    SparkDataTypes.TimestampNTZType
+  }
+
+  override def visit(localZonedTimestampType: LocalZonedTimestampType): 
SparkDataType = {
+    SparkDataTypes.TimestampType
+  }
+
+  override def visit(arrayType: ArrayType): SparkDataType = {
+    SparkDataTypes.createArrayType(
+      arrayType.getElementType.accept(this),
+      arrayType.getElementType.isNullable)
+  }
+
+  override def visit(mapType: MapType): SparkDataType = {
+    SparkDataTypes.createMapType(
+      mapType.getKeyType.accept(this),
+      mapType.getValueType.accept(this),
+      mapType.getValueType.isNullable
+    )
+  }
+
+  override def visit(rowType: RowType): SparkDataType = {
+    val sparkFields = rowType.getFields.asScala.map {
+      flussField =>
+        SparkDataTypes.createStructField(
+          flussField.getName,
+          flussField.getType.accept(this),
+          flussField.getType.isNullable)
+    }
+    SparkDataTypes.createStructType(sparkFields.toArray)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
new file mode 100644
index 000000000..ee7e90633
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.types
+
+import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => 
FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _}
+
+import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => 
SparkDataType, MapType => SparkMapType, StructType, UserDefinedType}
+
+import scala.collection.JavaConverters._
+
+object SparkToFlussTypeVisitor {
+
+  def visit(dataType: SparkDataType): FlussDataType = {
+    dataType match {
+      case st: StructType =>
+        visitStructType(st)
+      case mt: SparkMapType =>
+        visitMapType(mt)
+      case at: SparkArrayType =>
+        visitArrayType(at)
+      case _: UserDefinedType[_] =>
+        throw new UnsupportedOperationException("User-defined type is not 
supported");
+      case t =>
+        visitPrimitiveType(t)
+    }
+  }
+
+  private def visitStructType(st: StructType): RowType = {
+    val flussDataFields = st.fields.map {
+      field =>
+        val flussDataType = visit(field.dataType)
+        new FlussDataField(field.name, flussDataType, 
field.getComment().orNull)
+    }
+    new RowType(flussDataFields.toList.asJava)
+  }
+
+  private def visitMapType(mt: SparkMapType): FlussMapType = {
+    new FlussMapType(visit(mt.keyType), 
visit(mt.valueType).copy(mt.valueContainsNull))
+  }
+
+  private def visitArrayType(at: SparkArrayType): FlussArrayType = {
+    new FlussArrayType(visit(at.elementType).copy(at.containsNull))
+  }
+
+  private def visitPrimitiveType(t: SparkDataType): FlussDataType = {
+    t match {
+      case _: org.apache.spark.sql.types.BooleanType =>
+        new BooleanType()
+      case _: org.apache.spark.sql.types.ByteType =>
+        new TinyIntType()
+      case _: org.apache.spark.sql.types.ShortType =>
+        new SmallIntType()
+      case _: org.apache.spark.sql.types.IntegerType =>
+        new IntType()
+      case _: org.apache.spark.sql.types.LongType =>
+        new BigIntType()
+      case _: org.apache.spark.sql.types.FloatType =>
+        new FloatType()
+      case _: org.apache.spark.sql.types.DoubleType =>
+        new DoubleType()
+      case dt: org.apache.spark.sql.types.DecimalType =>
+        new DecimalType(dt.precision, dt.scale)
+      case x: org.apache.spark.sql.types.BinaryType =>
+        new BytesType()
+      case _: org.apache.spark.sql.types.VarcharType =>
+        new StringType()
+      case ct: org.apache.spark.sql.types.CharType =>
+        new CharType(ct.length)
+      case _: org.apache.spark.sql.types.StringType =>
+        new StringType()
+      case _: org.apache.spark.sql.types.DateType =>
+        new DateType()
+      case _: org.apache.spark.sql.types.TimestampType =>
+        // spark only support 6 digits of precision
+        new LocalZonedTimestampType(6)
+      case _: org.apache.spark.sql.types.TimestampNTZType =>
+        // spark only support 6 digits of precision
+        new TimestampType(6)
+      case _ =>
+        throw new UnsupportedOperationException(s"Data 
type(${t.catalogString}) is not supported");
+    }
+  }
+
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
new file mode 100644
index 000000000..9d9b4a5c3
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
+
+object FlussIdentityTransform {
+  def unapply(transform: Transform): Option[Seq[String]] = {
+    transform match {
+      case IdentityTransform(FieldReference(parts)) =>
+        Some(parts)
+      case _ =>
+        None
+    }
+  }
+}
diff --git a/fluss-spark/fluss-spark-ut/pom.xml 
b/fluss-spark/fluss-spark-ut/pom.xml
new file mode 100644
index 000000000..7592c6ffd
--- /dev/null
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.fluss</groupId>
+        <artifactId>fluss-spark</artifactId>
+        <version>0.9-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>fluss-spark-ut_${scala.binary.version}</artifactId>
+    <name>Fluss : Engine Spark : UT</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-spark-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-test-utils</artifactId>
+        </dependency>
+
+        <!-- for curator TestingServer  -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>prepare-test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
new file mode 100644
index 000000000..a9022fff4
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, 
TablePath}
+import org.apache.fluss.types.{DataTypes, RowType}
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.{assertThat, assertThatList}
+
+import scala.collection.JavaConverters._
+
+class FlussCatalogTest extends FlussSparkTestBase {
+
+  test("Catalog: namespaces") {
+    // Always a default database 'fluss'.
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+
+    sql("CREATE DATABASE testdb COMMENT 'created by spark'")
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("testdb") 
:: Nil)
+
+    checkAnswer(
+      sql("DESC DATABASE testdb").filter("info_name != 'Owner'"),
+      Row("Catalog Name", "fluss_catalog") :: Row("Namespace Name", "testdb") 
:: Row(
+        "Comment",
+        "created by spark") :: Nil
+    )
+
+    sql("DROP DATABASE testdb")
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+  }
+
+  test("Catalog: basic table") {
+    sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string) 
COMMENT 'my test table'")
+    checkAnswer(sql("SHOW TABLES"), Row(DEFAULT_DATABASE, "test_tbl", false) 
:: Nil)
+    checkAnswer(sql("DESC test_tbl"), Row("id", "int", null) :: Row("name", 
"string", null) :: Nil)
+
+    val testTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
"test_tbl")).get()
+    assertThat(testTable.getTablePath.getTableName).isEqualTo("test_tbl")
+    assertThat(testTable.getComment.orElse(null)).isEqualTo("my test table")
+    assertThat(testTable.getRowType).isEqualTo(
+      RowType.builder().field("id", DataTypes.INT()).field("name", 
DataTypes.STRING()).build())
+
+    sql(s"""
+           |CREATE TABLE $DEFAULT_DATABASE.test_pt_tbl (id int, name string, 
pt string)
+           |PARTITIONED BY (pt)
+           |TBLPROPERTIES("key" = "value")
+           |""".stripMargin)
+
+    val testPartitionedTable =
+      admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_pt_tbl")).get()
+    assertThat(testPartitionedTable.getRowType).isEqualTo(
+      RowType
+        .builder()
+        .field("id", DataTypes.INT())
+        .field("name", DataTypes.STRING())
+        .field("pt", DataTypes.STRING())
+        .build())
+    assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt")
+    
assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true)
+    assertThat(
+      
testPartitionedTable.getCustomProperties.getRawValue("key").get().asInstanceOf[String])
+      .isEqualTo("value")
+
+    sql("DROP TABLE test_tbl")
+    sql("DROP TABLE test_pt_tbl")
+    checkAnswer(sql("SHOW TABLES"), Nil)
+  }
+
+  test("Catalog: primary-key table") {
+    sql(s"""
+           |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt 
string)
+           |PARTITIONED BY (pt)
+           |TBLPROPERTIES("primary.key" = "id,pt")
+           |""".stripMargin)
+
+    val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
"test_tbl")).get()
+    assertThatList(tbl1.getPrimaryKeys).hasSameElementsAs(Seq("id", 
"pt").toList.asJava)
+    assertThat(tbl1.getNumBuckets).isEqualTo(1)
+    assertThat(tbl1.getBucketKeys.contains("id")).isEqualTo(true)
+    assertThat(tbl1.getPartitionKeys.contains("pt")).isEqualTo(true)
+
+    sql(
+      s"""
+         |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name 
string, pt1 string, pt2 string)
+         |PARTITIONED BY (pt1, pt2)
+         |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, 
"bucket.key" = "pk1")
+         |""".stripMargin)
+
+    val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, 
"test_tbl2")).get()
+    assertThatList(tbl2.getPrimaryKeys).hasSameElementsAs(
+      Seq("pk1", "pk2", "pt1", "pt2").toList.asJava)
+    assertThat(tbl2.getNumBuckets).isEqualTo(3)
+    
assertThatList(tbl2.getBucketKeys).hasSameElementsAs(Seq("pk1").toList.asJava)
+  }
+
+  test("Catalog: check namespace and table created by admin") {
+    val dbDesc = DatabaseDescriptor.builder().comment("created by 
admin").build()
+    admin.createDatabase("db_by_admin", dbDesc, true).get()
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: 
Row("db_by_admin") :: Nil)
+
+    sql("USE db_by_admin")
+    val tablePath = TablePath.of("db_by_admin", "tbl_by_admin")
+    val rt = RowType
+      .builder()
+      .field("id", DataTypes.INT())
+      .field("name", DataTypes.STRING())
+      .field("pt", DataTypes.STRING())
+      .build()
+    val tableDesc = TableDescriptor
+      .builder()
+      .schema(Schema.newBuilder().fromRowType(rt).build())
+      .partitionedBy("pt")
+      .build()
+    admin.createTable(tablePath, tableDesc, false).get()
+    checkAnswer(sql("SHOW TABLES"), Row("db_by_admin", "tbl_by_admin", false) 
:: Nil)
+    checkAnswer(
+      sql("DESC tbl_by_admin"),
+      Row("id", "int", null) :: Row("name", "string", null) :: Row("pt", 
"string", null) :: Nil)
+
+    admin.dropTable(tablePath, true).get()
+    checkAnswer(sql("SHOW TABLES"), Nil)
+
+    admin.dropDatabase("db_by_admin", true, true).get()
+    checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
new file mode 100644
index 000000000..08b6d4600
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.server.testutils.FlussClusterExtension
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSparkSession
+import org.junit.jupiter.api.extension.RegisterExtension
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import java.time.Duration
+
+class FlussSparkTestBase extends QueryTest with SharedSparkSession {
+
+  import FlussSparkTestBase._
+
+  protected val DEFAULT_DATABASE = "fluss";
+
+  protected var conn: Connection = _
+  protected var admin: Admin = _
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.sql.catalog.fluss_catalog", classOf[SparkCatalog].getName)
+      .set("spark.sql.catalog.fluss_catalog.bootstrap.servers", 
bootstrapServers)
+      .set("spark.sql.defaultCatalog", "fluss_catalog")
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    conn = ConnectionFactory.createConnection(clientConf)
+    admin = conn.getAdmin
+
+    sql(s"USE $DEFAULT_DATABASE")
+  }
+
+  override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+      pos: Position): Unit = {
+    println(testName)
+    super.test(testName, testTags: _*)(testFun)(pos)
+  }
+
+}
+
+@RegisterExtension
+object FlussSparkTestBase {
+  val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension =
+    FlussClusterExtension.builder
+      .setClusterConf(
+        new Configuration()
+          .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+      )
+      .setNumOfTabletServers(3)
+      .build
+
+  FLUSS_CLUSTER_EXTENSION.start()
+
+  val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig
+  val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers
+}
diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml
new file mode 100644
index 000000000..777352809
--- /dev/null
+++ b/fluss-spark/pom.xml
@@ -0,0 +1,337 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.fluss</groupId>
+        <artifactId>fluss</artifactId>
+        <version>0.9-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>fluss-spark</artifactId>
+    <name>Fluss : Engine Spark :</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>fluss-spark-common</module>
+        <module>fluss-spark-ut</module>
+        <module>fluss-spark-3.5</module>
+        <module>fluss-spark-3.4</module>
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>3.1.0</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-sql_${scala.binary.version}</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j2-impl</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-core_${scala.binary.version}</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <!-- SPARK-40511 upgrades SLF4J2, which is not compatible 
w/ SLF4J1 -->
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j2-impl</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.google.protobuf</groupId>
+                        <artifactId>protobuf-java</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-hive_${scala.binary.version}</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-sql_${scala.binary.version}</artifactId>
+                <classifier>tests</classifier>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j2-impl</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+                <classifier>tests</classifier>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-core_${scala.binary.version}</artifactId>
+                <classifier>tests</classifier>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.logging.log4j</groupId>
+                        <artifactId>log4j-slf4j2-impl</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.scalastyle</groupId>
+                    <artifactId>scalastyle-maven-plugin</artifactId>
+                    <version>1.0.0</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                    <configuration>
+                        <verbose>false</verbose>
+                        <failOnViolation>true</failOnViolation>
+                        
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                        <failOnWarning>false</failOnWarning>
+                        
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                        
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                        
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+                        <inputEncoding>UTF-8</inputEncoding>
+                        <outputEncoding>UTF-8</outputEncoding>
+                    </configuration>
+                </plugin>
+
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>3.8.0</version>
+                    <configuration>
+                        <source>${target.java.version}</source>
+                        <target>${target.java.version}</target>
+                        
<useIncrementalCompilation>false</useIncrementalCompilation>
+                        <compilerArgs>
+                            <arg>-Xpkginfo:always</arg>
+                            <arg>-Xlint:deprecation</arg>
+                        </compilerArgs>
+                    </configuration>
+                </plugin>
+
+                <plugin>
+                    <groupId>net.alchim31.maven</groupId>
+                    <artifactId>scala-maven-plugin</artifactId>
+                    <version>3.2.2</version>
+                    <executions>
+                        <!-- Run scala compiler in the process-resources 
phase, so that dependencies on
+                            scala classes can be resolved later in the (Java) 
compile phase -->
+                        <execution>
+                            <id>scala-compile-first</id>
+                            <phase>process-resources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                                <goal>compile</goal>
+                            </goals>
+                        </execution>
+
+                        <!-- Run scala compiler in the process-test-resources 
phase, so that dependencies on
+                             scala classes can be resolved later in the (Java) 
test-compile phase -->
+                        <execution>
+                            <id>scala-test-compile</id>
+                            <phase>process-test-resources</phase>
+                            <goals>
+                                <goal>testCompile</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                    <configuration>
+                        <scalaVersion>${scala.version}</scalaVersion>
+                        
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
+                        <args>
+                            <arg>-nobootcp</arg>
+                            <arg>-target:jvm-${target.java.version}</arg>
+                        </args>
+                    </configuration>
+                </plugin>
+
+                <plugin>
+                    <groupId>org.scalatest</groupId>
+                    <artifactId>scalatest-maven-plugin</artifactId>
+                    <version>2.1.0</version>
+                    <configuration>
+                        
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                        <junitxml>.</junitxml>
+                        <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g 
-XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} 
-Dio.netty.tryReflectionSetAccessible=true</argLine>
+                        <filereports>FlussTestSuite.txt</filereports>
+                        <forkMode>once</forkMode>
+                        <noScalaTestIgnore>true</noScalaTestIgnore>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>test</id>
+                            <goals>
+                                <goal>test</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 7f0a4cc9b..16e942860 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -126,6 +126,7 @@
                                                 
<exclude>fluss-test-coverage/**</exclude>
                                                 
<exclude>fluss-test-utils/**</exclude>
                                                 
<exclude>fluss-flink/**</exclude>
+                                                
<exclude>fluss-spark/**</exclude>
                                                 
<exclude>fluss-lake/**</exclude>
                                             </excludes>
                                         </resource>
@@ -179,6 +180,44 @@
             </build>
         </profile>
 
+        <profile>
+            <id>test-spark3</id>
+            <build>
+                <plugins>
+                    <!-- required by jacoco for the goal: check to work -->
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-resources-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>copy-class-files</id>
+                                <phase>generate-resources</phase>
+                                <goals>
+                                    <goal>copy-resources</goal>
+                                </goals>
+                                <configuration>
+                                    <overwrite>false</overwrite>
+                                    <resources>
+                                        <resource>
+                                            
<directory>${project.basedir}/../</directory>
+                                            <includes>
+                                                
<include>fluss-spark/**/target/classes/**</include>
+                                            </includes>
+                                            <excludes>
+                                                
<exclude>fluss-test-coverage/**</exclude>
+                                                
<exclude>fluss-test-utils/**</exclude>
+                                            </excludes>
+                                        </resource>
+                                    </resources>
+                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
         <profile>
             <id>test-lake</id>
             <build>
@@ -288,6 +327,8 @@
                                         </limit>
                                     </limits>
                                     <excludes>
+                                        
<exclude>org.apache.fluss.spark.*</exclude>
+                                        
<exclude>org.apache.spark.sql.*</exclude>
                                         
<exclude>org.apache.fluss.protogen.*</exclude>
                                         
<exclude>org.apache.fluss.memory.*</exclude>
                                         
<exclude>org.apache.fluss.utils.*</exclude>
diff --git a/pom.xml b/pom.xml
index 247040141..99558b790 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
         <module>fluss-test-coverage</module>
         <module>fluss-server</module>
         <module>fluss-flink</module>
+        <module>fluss-spark</module>
         <module>fluss-protogen</module>
         <module>fluss-jmh</module>
         <module>fluss-lake</module>
@@ -91,6 +92,13 @@
         <paimon.version>1.3.1</paimon.version>
         <iceberg.version>1.10.0</iceberg.version>
 
+        <!-- spark & scala -->
+        <scala212.version>2.12.18</scala212.version>
+        <scala213.version>2.13.16</scala213.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <scala.version>${scala212.version}</scala.version>
+        <spark.version>3.5.7</spark.version>
+
         <fluss.hadoop.version>3.4.0</fluss.hadoop.version>
         <frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>
         <slf4j.version>1.7.36</slf4j.version>
@@ -1007,6 +1015,17 @@
 
                             <removeUnusedImports/>
                         </java>
+                        <scala>
+                            <scalafmt>
+                                <version>3.10.2</version>
+                                
<file>${maven.multiModuleProjectDirectory}/.scalafmt.conf</file>
+                            </scalafmt>
+
+                            <licenseHeader>
+                                
<file>${maven.multiModuleProjectDirectory}/copyright.txt</file>
+                                <delimiter>${spotless.delimiter}</delimiter>
+                            </licenseHeader>
+                        </scala>
                     </configuration>
                     <executions>
                         <execution>


Reply via email to