This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 873217554 [#4722] feat(paimon-spark-connector): support schema and 
table DDL and table DML for GravitinoPaimonCatalog in paimon spark connector 
(#5722)
873217554 is described below

commit 873217554c7022f191013605133c43872e8e5bf6
Author: cai can <[email protected]>
AuthorDate: Mon Dec 16 11:36:10 2024 +0800

    [#4722] feat(paimon-spark-connector): support schema and table DDL and 
table DML for GravitinoPaimonCatalog in paimon spark connector (#5722)
    
    ### What changes were proposed in this pull request?
    support schema and table DDL and table DML for GravitinoPaimonCatalog in
    paimon spark connector.
    
    ### Why are the changes needed?
    Fix:
    https://github.com/apache/gravitino/issues/4722
    https://github.com/apache/gravitino/issues/4717
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    new Its and UTs.
    
    ---------
    
    Co-authored-by: caican <[email protected]>
---
 .../catalog/lakehouse/paimon/PaimonConstants.java  |  57 ++++++++++
 .../lakehouse/paimon/PaimonPropertiesUtils.java    |  95 ++++++++++++++++
 .../paimon/PaimonCatalogPropertiesMetadata.java    |  26 ++---
 .../paimon/PaimonSchemaPropertiesMetadata.java     |   2 +-
 .../paimon/PaimonTablePropertiesMetadata.java      |  16 +--
 .../paimon/storage/PaimonOSSFileSystemConfig.java  |   7 +-
 .../paimon/storage/PaimonS3FileSystemConfig.java   |   7 +-
 docs/lakehouse-paimon-catalog.md                   |  35 +++---
 spark-connector/spark-common/build.gradle.kts      |  10 ++
 .../spark/connector/catalog/BaseCatalog.java       |   4 +-
 .../connector/paimon/GravitinoPaimonCatalog.java   |  84 +++++++++++++++
 .../paimon/PaimonPropertiesConstants.java          |  51 +++++++++
 .../paimon/PaimonPropertiesConverter.java          |  67 ++++++++++++
 .../spark/connector/paimon/SparkPaimonTable.java   |  88 +++++++++++++++
 .../connector/version/CatalogNameAdaptor.java      |  21 ++--
 .../connector/integration/test/SparkCommonIT.java  |  20 ++--
 .../integration/test/hive/SparkHiveCatalogIT.java  |   5 +
 .../test/iceberg/SparkIcebergCatalogIT.java        |   5 +
 .../SparkPaimonCatalogFilesystemBackendIT.java     |  71 ++++++++++++
 .../test/paimon/SparkPaimonCatalogIT.java          | 119 +++++++++++++++++++++
 .../integration/test/util/SparkTableInfo.java      |   7 ++
 .../integration/test/util/SparkUtilIT.java         |  11 +-
 .../paimon/TestPaimonPropertiesConverter.java      | 106 ++++++++++++++++++
 spark-connector/v3.3/spark/build.gradle.kts        |  11 ++
 .../paimon/GravitinoPaimonCatalogSpark33.java}     |  18 +---
 .../SparkPaimonCatalogFilesystemBackendIT33.java}  |  20 ++--
 .../connector/version/TestCatalogNameAdaptor.java  |   4 +
 spark-connector/v3.4/spark/build.gradle.kts        |  11 ++
 .../paimon/GravitinoPaimonCatalogSpark34.java}     |  26 ++---
 .../SparkPaimonCatalogFilesystemBackendIT34.java}  |  21 ++--
 .../connector/version/TestCatalogNameAdaptor.java  |   4 +
 spark-connector/v3.5/spark/build.gradle.kts        |  11 ++
 .../paimon/GravitinoPaimonCatalogSpark35.java}     |  18 +---
 .../SparkPaimonCatalogFilesystemBackendIT35.java}  |  21 ++--
 .../connector/version/TestCatalogNameAdaptor.java  |   4 +
 35 files changed, 945 insertions(+), 138 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
new file mode 100644
index 000000000..291a7ea96
--- /dev/null
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon;
+
+public class PaimonConstants {
+
+  // Paimon catalog properties constants
+  public static final String CATALOG_BACKEND = "catalog-backend";
+  public static final String METASTORE = "metastore";
+  public static final String URI = "uri";
+  public static final String WAREHOUSE = "warehouse";
+  public static final String CATALOG_BACKEND_NAME = "catalog-backend-name";
+
+  public static final String GRAVITINO_JDBC_USER = "jdbc-user";
+  public static final String PAIMON_JDBC_USER = "jdbc.user";
+
+  public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
+  public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";
+
+  public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
+
+  // S3 properties needed by Paimon
+  public static final String S3_ENDPOINT = "s3.endpoint";
+  public static final String S3_ACCESS_KEY = "s3.access-key";
+  public static final String S3_SECRET_KEY = "s3.secret-key";
+
+  // OSS related properties
+  public static final String OSS_ENDPOINT = "fs.oss.endpoint";
+  public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
+  public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+
+  // Iceberg Table properties constants
+  public static final String COMMENT = "comment";
+  public static final String OWNER = "owner";
+  public static final String BUCKET_KEY = "bucket-key";
+  public static final String MERGE_ENGINE = "merge-engine";
+  public static final String SEQUENCE_FIELD = "sequence.field";
+  public static final String ROWKIND_FIELD = "rowkind.field";
+  public static final String PRIMARY_KEY = "primary-key";
+  public static final String PARTITION = "partition";
+}
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
new file mode 100644
index 000000000..0dcf24f3a
--- /dev/null
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.gravitino.storage.S3Properties;
+
+public class PaimonPropertiesUtils {
+
+  // Map that maintains the mapping of keys in Gravitino to that in Paimon, 
for example, users
+  // will only need to set the configuration 'catalog-backend' in Gravitino 
and Gravitino will
+  // change it to `catalogType` automatically and pass it to Paimon.
+  public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON;
+
+  static {
+    Map<String, String> map = new HashMap();
+    map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND);
+    map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER, 
PaimonConstants.GRAVITINO_JDBC_DRIVER);
+    map.put(PaimonConstants.GRAVITINO_JDBC_USER, 
PaimonConstants.PAIMON_JDBC_USER);
+    map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD, 
PaimonConstants.PAIMON_JDBC_PASSWORD);
+    map.put(PaimonConstants.URI, PaimonConstants.URI);
+    map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE);
+    map.put(PaimonConstants.CATALOG_BACKEND_NAME, 
PaimonConstants.CATALOG_BACKEND_NAME);
+    // S3
+    map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT);
+    map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, 
PaimonConstants.S3_ACCESS_KEY);
+    map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, 
PaimonConstants.S3_SECRET_KEY);
+    // OSS
+    map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, 
PaimonConstants.OSS_ENDPOINT);
+    map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, 
PaimonConstants.OSS_ACCESS_KEY);
+    map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, 
PaimonConstants.OSS_SECRET_KEY);
+    GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map);
+  }
+
+  /**
+   * Converts Gravitino properties to Paimon catalog properties, the common 
transform logic shared
+   * by Spark connector, Gravitino Paimon catalog.
+   *
+   * @param gravitinoProperties a map of Gravitino configuration properties.
+   * @return a map containing Paimon catalog properties.
+   */
+  public static Map<String, String> toPaimonCatalogProperties(
+      Map<String, String> gravitinoProperties) {
+    Map<String, String> paimonProperties = new HashMap<>();
+    gravitinoProperties.forEach(
+        (key, value) -> {
+          if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) {
+            paimonProperties.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value);
+          }
+        });
+    return paimonProperties;
+  }
+
+  /**
+   * Get catalog backend name from Gravitino catalog properties.
+   *
+   * @param catalogProperties a map of Gravitino catalog properties.
+   * @return catalog backend name.
+   */
+  public static String getCatalogBackendName(Map<String, String> 
catalogProperties) {
+    String backendName = 
catalogProperties.get(PaimonConstants.CATALOG_BACKEND_NAME);
+    if (backendName != null) {
+      return backendName;
+    }
+
+    String catalogBackend = 
catalogProperties.get(PaimonConstants.CATALOG_BACKEND);
+    return Optional.ofNullable(catalogBackend)
+        .map(s -> s.toLowerCase(Locale.ROOT))
+        .orElseThrow(
+            () ->
+                new UnsupportedOperationException(
+                    String.format("Unsupported catalog backend: %s", 
catalogBackend)));
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
index e3b59bff3..4c9dcb07a 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
@@ -45,20 +45,22 @@ import org.apache.gravitino.storage.S3Properties;
  */
 public class PaimonCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
 
-  @VisibleForTesting public static final String GRAVITINO_CATALOG_BACKEND = 
"catalog-backend";
-  public static final String PAIMON_METASTORE = "metastore";
-  public static final String WAREHOUSE = "warehouse";
-  public static final String URI = "uri";
-  public static final String GRAVITINO_JDBC_USER = "jdbc-user";
-  public static final String PAIMON_JDBC_USER = "jdbc.user";
-  public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
-  public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";
-  public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
+  @VisibleForTesting
+  public static final String GRAVITINO_CATALOG_BACKEND = 
PaimonConstants.CATALOG_BACKEND;
+
+  public static final String PAIMON_METASTORE = PaimonConstants.METASTORE;
+  public static final String WAREHOUSE = PaimonConstants.WAREHOUSE;
+  public static final String URI = PaimonConstants.URI;
+  public static final String GRAVITINO_JDBC_USER = 
PaimonConstants.GRAVITINO_JDBC_USER;
+  public static final String PAIMON_JDBC_USER = 
PaimonConstants.PAIMON_JDBC_USER;
+  public static final String GRAVITINO_JDBC_PASSWORD = 
PaimonConstants.GRAVITINO_JDBC_PASSWORD;
+  public static final String PAIMON_JDBC_PASSWORD = 
PaimonConstants.PAIMON_JDBC_PASSWORD;
+  public static final String GRAVITINO_JDBC_DRIVER = 
PaimonConstants.GRAVITINO_JDBC_DRIVER;
 
   // S3 properties needed by Paimon
-  public static final String S3_ENDPOINT = "s3.endpoint";
-  public static final String S3_ACCESS_KEY = "s3.access-key";
-  public static final String S3_SECRET_KEY = "s3.secret-key";
+  public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
+  public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
+  public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;
 
   public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
       ImmutableMap.of(
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
index 9a6ddb5a1..3da05099c 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
@@ -34,7 +34,7 @@ import org.apache.gravitino.connector.PropertyEntry;
  */
 public class PaimonSchemaPropertiesMetadata extends BasePropertiesMetadata {
 
-  public static final String COMMENT = "comment";
+  public static final String COMMENT = PaimonConstants.COMMENT;
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
index 671dd9d66..ad63df678 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
@@ -35,14 +35,14 @@ import org.apache.gravitino.connector.PropertyEntry;
  */
 public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {
 
-  public static final String COMMENT = "comment";
-  public static final String OWNER = "owner";
-  public static final String BUCKET_KEY = "bucket-key";
-  public static final String MERGE_ENGINE = "merge-engine";
-  public static final String SEQUENCE_FIELD = "sequence.field";
-  public static final String ROWKIND_FIELD = "rowkind.field";
-  public static final String PRIMARY_KEY = "primary-key";
-  public static final String PARTITION = "partition";
+  public static final String COMMENT = PaimonConstants.COMMENT;
+  public static final String OWNER = PaimonConstants.OWNER;
+  public static final String BUCKET_KEY = PaimonConstants.BUCKET_KEY;
+  public static final String MERGE_ENGINE = PaimonConstants.MERGE_ENGINE;
+  public static final String SEQUENCE_FIELD = PaimonConstants.SEQUENCE_FIELD;
+  public static final String ROWKIND_FIELD = PaimonConstants.ROWKIND_FIELD;
+  public static final String PRIMARY_KEY = PaimonConstants.PRIMARY_KEY;
+  public static final String PARTITION = PaimonConstants.PARTITION;
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
index ad7fa26f3..7b703b5b7 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
@@ -29,9 +30,9 @@ import org.apache.gravitino.connector.PropertyEntry;
 
 public class PaimonOSSFileSystemConfig extends Config {
   // OSS related properties
-  public static final String OSS_ENDPOINT = "fs.oss.endpoint";
-  public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
-  public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+  public static final String OSS_ENDPOINT = PaimonConstants.OSS_ENDPOINT;
+  public static final String OSS_ACCESS_KEY = PaimonConstants.OSS_ACCESS_KEY;
+  public static final String OSS_SECRET_KEY = PaimonConstants.OSS_SECRET_KEY;
 
   public PaimonOSSFileSystemConfig(Map<String, String> properties) {
     super(false);
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
index 4184fcc06..6588e4a52 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
@@ -29,9 +30,9 @@ import org.apache.gravitino.connector.PropertyEntry;
 
 public class PaimonS3FileSystemConfig extends Config {
   // S3 related properties
-  public static final String S3_ENDPOINT = "s3.endpoint";
-  public static final String S3_ACCESS_KEY = "s3.access-key";
-  public static final String S3_SECRET_KEY = "s3.secret-key";
+  public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
+  public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
+  public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;
 
   public PaimonS3FileSystemConfig(Map<String, String> properties) {
     super(false);
diff --git a/docs/lakehouse-paimon-catalog.md b/docs/lakehouse-paimon-catalog.md
index d53ad4827..b67fe37db 100644
--- a/docs/lakehouse-paimon-catalog.md
+++ b/docs/lakehouse-paimon-catalog.md
@@ -29,23 +29,24 @@ Builds with Apache Paimon `0.8.0`.
 
 ### Catalog properties
 
-| Property name                                      | Description             
                                                                                
                                                                                
                    | Default value | Required                                  
                                                                                
                                           | Since Version     |
-|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
-| `catalog-backend`                                  | Catalog backend of 
Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`.             
                                                                                
                         | (none)        | Yes                                  
                                                                                
                                                | 0.6.0-incubating  |
-| `uri`                                              | The URI configuration 
of the Paimon catalog. `thrift://127.0.0.1:9083` or 
`jdbc:postgresql://127.0.0.1:5432/db_name` or 
`jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for 
`FilesystemCatalog`. | (none)        | required if the value of 
`catalog-backend` is not `filesystem`.                                          
                                                            | 0.6.0-incubating  
|
-| `warehouse`                                        | Warehouse directory of 
catalog. `file:///user/hive/warehouse-paimon/` for local fs, 
`hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or 
`oss://{bucket-name}/path` for Aliyun OSS  | (none)        | Yes                
                                                                                
                                                                  | 
0.6.0-incubating  |
-| `authentication.type`                              | The type of 
authentication for Paimon catalog backend, currently Gravitino only supports 
`Kerberos` and `simple`.                                                        
                                   | `simple`      | No                         
                                                                                
                                                          | 0.6.0-incubating  |
-| `hive.metastore.sasl.enabled`                      | Whether to enable SASL 
authentication protocol when connect to Kerberos Hive metastore. This is a raw 
Hive configuration                                                              
                      | `false`       | No, This value should be true in most 
case(Some will use SSL protocol, but it rather rare) if the value of 
`gravitino.iceberg-rest.authentication.type` is Kerberos. | 0.6.0-incubating  |
-| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication.                                                        
                                                                                
                       | (none)        | required if the value of 
`authentication.type` is Kerberos.                                              
                                                            | 0.6.0-incubating  
|
-| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                                
                                                                                
                      | (none)        | required if the value of 
`authentication.type` is Kerberos.                                              
                                                            | 0.6.0-incubating  
|
-| `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for Paimon catalog.                                         
                                                                                
                      | 60            | No                                      
                                                                                
                                             | 0.6.0-incubating  |
-| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.           
                                                                                
                       | 60            | No                                     
                                                                                
                                              | 0.6.0-incubating  |
-| `oss-endpoint`                                     | The endpoint of the 
Aliyun OSS.                                                                     
                                                                                
                        | (none)        | required if the value of `warehouse` 
is a OSS path                                                                   
                                                | 0.7.0-incubating  |
-| `oss-access-key-id`                                | The access key of the 
Aliyun OSS.                                                                     
                                                                                
                      | (none)        | required if the value of `warehouse` is 
a OSS path                                                                      
                                             | 0.7.0-incubating  |
-| `oss-accesss-key-secret`                           | The secret key the 
Aliyun OSS.                                                                     
                                                                                
                         | (none)        | required if the value of `warehouse` 
is a OSS path                                                                   
                                                | 0.7.0-incubating  |
-| `s3-endpoint`                                      | The endpoint of the AWS 
S3.                                                                             
                                                                                
                    | (none)        | required if the value of `warehouse` is a 
S3 path                                                                         
                                           | 0.7.0-incubating  |
-| `s3-access-key-id`                                 | The access key of the 
AWS S3.                                                                         
                                                                                
                      | (none)        | required if the value of `warehouse` is 
a S3 path                                                                       
                                             | 0.7.0-incubating  |
-| `s3-secret-access-key`                             | The secret key of the 
AWS S3.                                                                         
                                                                                
                      | (none)        | required if the value of `warehouse` is 
a S3 path                                                                       
                                             | 0.7.0-incubating  |
+| Property name                                      | Description             
                                                                                
                                                                                
                    | Default value                                             
                     | Required                                                 
                                                                                
              [...]
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| `catalog-backend`                                  | Catalog backend of 
Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`.             
                                                                                
                         | (none)                                               
                          | Yes                                                 
                                                                                
                   [...]
+| `uri`                                              | The URI configuration 
of the Paimon catalog. `thrift://127.0.0.1:9083` or 
`jdbc:postgresql://127.0.0.1:5432/db_name` or 
`jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for 
`FilesystemCatalog`. | (none)                                                   
                      | required if the value of `catalog-backend` is not 
`filesystem`.                                                                   
                     [...]
+| `warehouse`                                        | Warehouse directory of 
catalog. `file:///user/hive/warehouse-paimon/` for local fs, 
`hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or 
`oss://{bucket-name}/path` for Aliyun OSS  | (none)                             
                                            | Yes                               
                                                                                
                                     [...]
+| `catalog-backend-name`                             | The catalog name passed 
to underlying Paimon catalog backend.                                           
                                                                                
                    | The property value of `catalog-backend`, like `jdbc` for 
JDBC catalog backend. | No                                                      
                                                                                
               [...]
+| `authentication.type`                              | The type of 
authentication for Paimon catalog backend, currently Gravitino only supports 
`Kerberos` and `simple`.                                                        
                                   | `simple`                                   
                                    | No                                        
                                                                                
                             [...]
+| `hive.metastore.sasl.enabled`                      | Whether to enable SASL 
authentication protocol when connect to Kerberos Hive metastore. This is a raw 
Hive configuration                                                              
                      | `false`                                                 
                       | No, This value should be true in most case(Some will 
use SSL protocol, but it rather rare) if the value of 
`gravitino.iceberg-rest.authentication.type [...]
+| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication.                                                        
                                                                                
                       | (none)                                                 
                        | required if the value of `authentication.type` is 
Kerberos.                                                                       
                     [...]
+| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                                
                                                                                
                      | (none)                                                  
                       | required if the value of `authentication.type` is 
Kerberos.                                                                       
                     [...]
+| `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for Paimon catalog.                                         
                                                                                
                      | 60                                                      
                       | No                                                     
                                                                                
                [...]
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.           
                                                                                
                       | 60                                                     
                        | No                                                    
                                                                                
                 [...]
+| `oss-endpoint`                                     | The endpoint of the 
Aliyun OSS.                                                                     
                                                                                
                        | (none)                                                
                         | required if the value of `warehouse` is a OSS path   
                                                                                
                  [...]
+| `oss-access-key-id`                                | The access key of the 
Aliyun OSS.                                                                     
                                                                                
                      | (none)                                                  
                       | required if the value of `warehouse` is a OSS path     
                                                                                
                [...]
+| `oss-accesss-key-secret`                           | The secret key the 
Aliyun OSS.                                                                     
                                                                                
                         | (none)                                               
                          | required if the value of `warehouse` is a OSS path  
                                                                                
                   [...]
+| `s3-endpoint`                                      | The endpoint of the AWS 
S3.                                                                             
                                                                                
                    | (none)                                                    
                     | required if the value of `warehouse` is a S3 path        
                                                                                
              [...]
+| `s3-access-key-id`                                 | The access key of the 
AWS S3.                                                                         
                                                                                
                      | (none)                                                  
                       | required if the value of `warehouse` is a S3 path      
                                                                                
                [...]
+| `s3-secret-access-key`                             | The secret key of the 
AWS S3.                                                                         
                                                                                
                      | (none)                                                  
                       | required if the value of `warehouse` is a S3 path      
                                                                                
                [...]
 
 :::note
 If you want to use the `oss` or `s3` warehouse, you need to place related jars 
in the `catalogs/lakehouse-paimon/lib` directory, more information can be found 
in the [Paimon S3](https://paimon.apache.org/docs/master/filesystems/s3/).
diff --git a/spark-connector/spark-common/build.gradle.kts 
b/spark-connector/spark-common/build.gradle.kts
index 7f3c66aa6..06e0077d2 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] 
as? String ?: extr
 val sparkVersion: String = libs.versions.spark33.get()
 val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
 val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
 // kyuubi hive connector for Spark 3.3 doesn't support scala 2.13
 val kyuubiVersion: String = libs.versions.kyuubi4spark34.get()
 val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
@@ -43,6 +44,9 @@ dependencies {
   compileOnly(project(":clients:client-java-runtime", configuration = 
"shadow"))
   
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
   
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
+  
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") 
{
+    exclude("org.apache.spark")
+  }
 
   compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
   compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
@@ -114,6 +118,9 @@ dependencies {
   testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+  
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
 {
+    exclude("org.apache.spark")
+  }
   
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
   // include spark-sql,spark-catalyst,hive-common,hdfs-client
   
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -123,6 +130,9 @@ dependencies {
     exclude("org.glassfish.jersey.inject")
   }
   
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+  
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 2201bd222..5706895ca 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -76,11 +76,11 @@ public abstract class BaseCatalog implements TableCatalog, 
SupportsNamespaces {
   protected TableCatalog sparkCatalog;
   protected PropertiesConverter propertiesConverter;
   protected SparkTransformConverter sparkTransformConverter;
+  // The Gravitino catalog client to do schema operations.
+  protected Catalog gravitinoCatalogClient;
   private SparkTypeConverter sparkTypeConverter;
   private SparkTableChangeConverter sparkTableChangeConverter;
 
-  // The Gravitino catalog client to do schema operations.
-  private Catalog gravitinoCatalogClient;
   private String catalogName;
   private final GravitinoCatalogManager gravitinoCatalogManager;
 
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
new file mode 100644
index 000000000..86ca680c4
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+import org.apache.gravitino.spark.connector.SparkTransformConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.catalog.BaseCatalog;
+import org.apache.paimon.spark.SparkCatalog;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class GravitinoPaimonCatalog extends BaseCatalog {
+
+  @Override
+  protected TableCatalog createAndInitSparkCatalog(
+      String name, CaseInsensitiveStringMap options, Map<String, String> 
properties) {
+    String catalogBackendName = 
PaimonPropertiesUtils.getCatalogBackendName(properties);
+    TableCatalog paimonCatalog = new SparkCatalog();
+    Map<String, String> all =
+        getPropertiesConverter().toSparkCatalogProperties(options, properties);
+    paimonCatalog.initialize(catalogBackendName, new 
CaseInsensitiveStringMap(all));
+    return paimonCatalog;
+  }
+
+  @Override
+  protected Table createSparkTable(
+      Identifier identifier,
+      org.apache.gravitino.rel.Table gravitinoTable,
+      Table sparkTable,
+      TableCatalog sparkCatalog,
+      PropertiesConverter propertiesConverter,
+      SparkTransformConverter sparkTransformConverter,
+      SparkTypeConverter sparkTypeConverter) {
+    return new SparkPaimonTable(
+        identifier,
+        gravitinoTable,
+        (SparkTable) sparkTable,
+        propertiesConverter,
+        sparkTransformConverter,
+        sparkTypeConverter);
+  }
+
+  @Override
+  protected PropertiesConverter getPropertiesConverter() {
+    return PaimonPropertiesConverter.getInstance();
+  }
+
+  @Override
+  protected SparkTransformConverter getSparkTransformConverter() {
+    return new SparkTransformConverter(true);
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    sparkCatalog.invalidateTable(ident);
+    return gravitinoCatalogClient
+        .asTableCatalog()
+        .purgeTable(NameIdentifier.of(getDatabase(ident), ident.name()));
+  }
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
new file mode 100644
index 000000000..915308ae8
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+
+public class PaimonPropertiesConstants {
+
+  public static final String GRAVITINO_PAIMON_CATALOG_BACKEND = 
PaimonConstants.CATALOG_BACKEND;
+  static final String PAIMON_CATALOG_METASTORE = PaimonConstants.METASTORE;
+
+  public static final String GRAVITINO_PAIMON_CATALOG_WAREHOUSE = 
PaimonConstants.WAREHOUSE;
+  static final String PAIMON_CATALOG_WAREHOUSE = PaimonConstants.WAREHOUSE;
+
+  public static final String GRAVITINO_PAIMON_CATALOG_URI = 
PaimonConstants.URI;
+  static final String PAIMON_CATALOG_URI = PaimonConstants.URI;
+  static final String GRAVITINO_PAIMON_CATALOG_JDBC_USER = 
PaimonConstants.GRAVITINO_JDBC_USER;
+  static final String PAIMON_CATALOG_JDBC_USER = 
PaimonConstants.PAIMON_JDBC_USER;
+
+  static final String GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD =
+      PaimonConstants.GRAVITINO_JDBC_PASSWORD;
+  static final String PAIMON_CATALOG_JDBC_PASSWORD = 
PaimonConstants.PAIMON_JDBC_PASSWORD;
+
+  public static final String PAIMON_CATALOG_BACKEND_HIVE = "hive";
+  static final String GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE = "hive";
+
+  static final String GRAVITINO_PAIMON_CATALOG_BACKEND_JDBC = "jdbc";
+  static final String PAIMON_CATALOG_BACKEND_JDBC = "jdbc";
+
+  public static final String PAIMON_CATALOG_BACKEND_FILESYSTEM = "filesystem";
+  static final String GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM = 
"filesystem";
+
+  public static final String PAIMON_TABLE_LOCATION = "path";
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
new file mode 100644
index 000000000..f713ca89d
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+
+public class PaimonPropertiesConverter implements PropertiesConverter {
+
+  public static class PaimonPropertiesConverterHolder {
+    private static final PaimonPropertiesConverter INSTANCE = new 
PaimonPropertiesConverter();
+  }
+
+  private PaimonPropertiesConverter() {}
+
+  public static PaimonPropertiesConverter getInstance() {
+    return PaimonPropertiesConverter.PaimonPropertiesConverterHolder.INSTANCE;
+  }
+
+  @Override
+  public Map<String, String> toSparkCatalogProperties(Map<String, String> 
properties) {
+    Preconditions.checkArgument(properties != null, "Paimon Catalog properties 
should not be null");
+    Map<String, String> all = 
PaimonPropertiesUtils.toPaimonCatalogProperties(properties);
+    String catalogBackend = 
all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogBackend),
+        String.format(
+            "%s should not be empty", 
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND));
+    all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, 
catalogBackend);
+    return all;
+  }
+
+  @Override
+  public Map<String, String> toGravitinoTableProperties(Map<String, String> 
properties) {
+    HashMap<String, String> gravitinoTableProperties = new 
HashMap<>(properties);
+    // The owner property of Paimon is a reserved property, so we need to 
remove it.
+    gravitinoTableProperties.remove(PaimonConstants.OWNER);
+    return gravitinoTableProperties;
+  }
+
+  @Override
+  public Map<String, String> toSparkTableProperties(Map<String, String> 
properties) {
+    return new HashMap<>(properties);
+  }
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
new file mode 100644
index 000000000..f1db29b71
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import java.util.Map;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+import org.apache.gravitino.spark.connector.SparkTransformConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkPaimonTable extends SparkTable {
+
+  private GravitinoTableInfoHelper gravitinoTableInfoHelper;
+  private org.apache.spark.sql.connector.catalog.Table sparkTable;
+
+  public SparkPaimonTable(
+      Identifier identifier,
+      Table gravitinoTable,
+      SparkTable sparkTable,
+      PropertiesConverter propertiesConverter,
+      SparkTransformConverter sparkTransformConverter,
+      SparkTypeConverter sparkTypeConverter) {
+    super(sparkTable.getTable());
+    this.gravitinoTableInfoHelper =
+        new GravitinoTableInfoHelper(
+            true,
+            identifier,
+            gravitinoTable,
+            propertiesConverter,
+            sparkTransformConverter,
+            sparkTypeConverter);
+    this.sparkTable = sparkTable;
+  }
+
+  @Override
+  public String name() {
+    return gravitinoTableInfoHelper.name();
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public StructType schema() {
+    return gravitinoTableInfoHelper.schema();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return gravitinoTableInfoHelper.properties();
+  }
+
+  @Override
+  public Transform[] partitioning() {
+    return gravitinoTableInfoHelper.partitioning();
+  }
+
+  /**
+   * If using SparkPaimonTable not SparkTable, we must extract snapshotId or 
branchName using the
+   * Paimon specific logic. It's hard to maintenance.
+   */
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    return ((SparkTable) sparkTable).newScanBuilder(options);
+  }
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
index 8141c799b..9392feac2 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
@@ -27,15 +27,24 @@ import org.apache.spark.util.VersionUtils$;
 public class CatalogNameAdaptor {
   private static final Map<String, String> catalogNames =
       ImmutableMap.of(
-          "hive-3.3", 
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33",
-          "hive-3.4", 
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34",
-          "hive-3.5", 
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35",
+          "hive-3.3",
+          
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33",
+          "hive-3.4",
+          
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34",
+          "hive-3.5",
+          
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35",
           "lakehouse-iceberg-3.3",
-              
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33",
+          
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33",
           "lakehouse-iceberg-3.4",
-              
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34",
+          
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34",
           "lakehouse-iceberg-3.5",
-              
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35");
+          
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35",
+          "lakehouse-paimon-3.3",
+          
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33",
+          "lakehouse-paimon-3.4",
+          
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34",
+          "lakehouse-paimon-3.5",
+          
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35");
 
   private static String sparkVersion() {
     return package$.MODULE$.SPARK_VERSION();
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index 63e4801ef..c7517a3bf 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -117,6 +117,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
 
   protected abstract boolean supportsSchemaEvolution();
 
+  protected abstract boolean supportsReplaceColumns();
+
   // Use a custom database not the original default database because 
SparkCommonIT couldn't
   // read&write data to tables in default database. The main reason is default 
database location is
   // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is 
local HDFS address
@@ -146,7 +148,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
       throw e;
     }
     sql("USE " + getCatalogName());
-    createDatabaseIfNotExists(getDefaultDatabase());
+    createDatabaseIfNotExists(getDefaultDatabase(), getProvider());
   }
 
   @BeforeEach
@@ -187,7 +189,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
   }
 
   @Test
-  void testCreateAndLoadSchema() {
+  protected void testCreateAndLoadSchema() {
     String testDatabaseName = "t_create1";
     dropDatabaseIfExists(testDatabaseName);
     sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES 
(ID=001);");
@@ -216,7 +218,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
   }
 
   @Test
-  void testAlterSchema() {
+  protected void testAlterSchema() {
     String testDatabaseName = "t_alter";
     dropDatabaseIfExists(testDatabaseName);
     sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES 
(ID=001);");
@@ -240,6 +242,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
   @Test
   void testDropSchema() {
     String testDatabaseName = "t_drop";
+    dropDatabaseIfExists(testDatabaseName);
     Set<String> databases = getDatabases();
     Assertions.assertFalse(databases.contains(testDatabaseName));
 
@@ -277,7 +280,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     // test db.table as table identifier
     String databaseName = "db1";
     String tableName = "table1";
-    createDatabaseIfNotExists(databaseName);
+    createDatabaseIfNotExists(databaseName, getProvider());
     String tableIdentifier = String.join(".", databaseName, tableName);
 
     dropTableIfExists(tableIdentifier);
@@ -291,7 +294,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     // use db then create table with table name
     databaseName = "db2";
     tableName = "table2";
-    createDatabaseIfNotExists(databaseName);
+    createDatabaseIfNotExists(databaseName, getProvider());
 
     sql("USE " + databaseName);
     dropTableIfExists(tableName);
@@ -379,7 +382,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     String database = "db_list";
     String table3 = "list3";
     String table4 = "list4";
-    createDatabaseIfNotExists(database);
+    createDatabaseIfNotExists(database, getProvider());
     dropTableIfExists(String.join(".", database, table3));
     dropTableIfExists(String.join(".", database, table4));
     createSimpleTable(String.join(".", database, table3));
@@ -550,7 +553,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
   }
 
   @Test
-  void testAlterTableReplaceColumns() {
+  @EnabledIf("supportsReplaceColumns")
+  protected void testAlterTableReplaceColumns() {
     String tableName = "test_replace_columns_table";
     dropTableIfExists(tableName);
 
@@ -563,7 +567,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
 
     sql(
         String.format(
-            "ALTER TABLE %S REPLACE COLUMNS (id int COMMENT 'new comment', 
name2 string, age long);",
+            "ALTER TABLE %s REPLACE COLUMNS (id int COMMENT 'new comment', 
name2 string, age long);",
             tableName));
     ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>();
     // change comment for id
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index c543d8281..b95882a0d 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -79,6 +79,11 @@ public abstract class SparkHiveCatalogIT extends 
SparkCommonIT {
     return false;
   }
 
+  @Override
+  protected boolean supportsReplaceColumns() {
+    return true;
+  }
+
   @Test
   void testCreateHiveFormatPartitionTable() {
     String tableName = "hive_partition_table";
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 52f4abf3a..f5fd337a1 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -104,6 +104,11 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
     return true;
   }
 
+  @Override
+  protected boolean supportsReplaceColumns() {
+    return true;
+  }
+
   @Override
   protected String getTableLocation(SparkTableInfo table) {
     return String.join(File.separator, table.getTableLocation(), "data");
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
new file mode 100644
index 000000000..3d4a3257a
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.paimon;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/** This class use Apache Paimon FilesystemCatalog for backend catalog. */
+@Tag("gravitino-docker-test")
+public abstract class SparkPaimonCatalogFilesystemBackendIT extends 
SparkPaimonCatalogIT {
+
+  @Override
+  protected Map<String, String> getCatalogConfigs() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(
+        PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+        PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM);
+    
catalogProperties.put(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
 warehouse);
+    return catalogProperties;
+  }
+
+  @Test
+  @Override
+  protected void testCreateAndLoadSchema() {
+    String testDatabaseName = "t_create1";
+    dropDatabaseIfExists(testDatabaseName);
+    sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES 
(ID=001);");
+    Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
+    // The database of the Paimon filesystem backend do not store any 
properties.
+    Assertions.assertFalse(databaseMeta.containsKey("ID"));
+  }
+
+  @Test
+  @Override
+  protected void testAlterSchema() {
+    String testDatabaseName = "t_alter";
+    dropDatabaseIfExists(testDatabaseName);
+    sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES 
(ID=001);");
+    Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
+    // The database of the Paimon filesystem backend do not store any 
properties.
+    Assertions.assertFalse(databaseMeta.containsKey("ID"));
+
+    // The Paimon filesystem backend do not support alter database operation.
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            sql(
+                String.format(
+                    "ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", 
testDatabaseName)));
+  }
+}
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
new file mode 100644
index 000000000..c77a4642e
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.paimon;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.spark.connector.integration.test.SparkCommonIT;
+import 
org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfo;
+import 
org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
+import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public abstract class SparkPaimonCatalogIT extends SparkCommonIT {
+
+  @Override
+  protected String getCatalogName() {
+    return "paimon";
+  }
+
+  @Override
+  protected String getProvider() {
+    return "lakehouse-paimon";
+  }
+
+  @Override
+  protected boolean supportsSparkSQLClusteredBy() {
+    return false;
+  }
+
+  @Override
+  protected boolean supportsPartition() {
+    return true;
+  }
+
+  @Override
+  protected boolean supportsDelete() {
+    return false;
+  }
+
+  @Override
+  protected boolean supportsSchemaEvolution() {
+    return true;
+  }
+
+  @Override
+  protected boolean supportsReplaceColumns() {
+    // Paimon doesn't support replace columns, because it doesn't support drop 
all fields in table.
+    // And `ALTER TABLE REPLACE COLUMNS` statement will remove all existing 
columns at first and
+    // then adds the new set of columns.
+    return false;
+  }
+
+  @Override
+  protected String getTableLocation(SparkTableInfo table) {
+    Map<String, String> tableProperties = table.getTableProperties();
+    return 
tableProperties.get(PaimonPropertiesConstants.PAIMON_TABLE_LOCATION);
+  }
+
+  @Test
+  void testPaimonPartitions() {
+    String partitionPathString = "name=a/address=beijing";
+
+    String tableName = "test_paimon_partition_table";
+    dropTableIfExists(tableName);
+    String createTableSQL = getCreatePaimonSimpleTableString(tableName);
+    createTableSQL = createTableSQL + " PARTITIONED BY (name, address);";
+    sql(createTableSQL);
+    SparkTableInfo tableInfo = getTableInfo(tableName);
+    SparkTableInfoChecker checker =
+        SparkTableInfoChecker.create()
+            .withName(tableName)
+            .withColumns(getPaimonSimpleTableColumn())
+            .withIdentifyPartition(Collections.singletonList("name"))
+            .withIdentifyPartition(Collections.singletonList("address"));
+    checker.check(tableInfo);
+
+    String insertData = String.format("INSERT into %s 
values(2,'a','beijing');", tableName);
+    sql(insertData);
+    List<String> queryResult = getTableData(tableName);
+    Assertions.assertEquals(1, queryResult.size());
+    Assertions.assertEquals("2,a,beijing", queryResult.get(0));
+    Path partitionPath = new Path(getTableLocation(tableInfo), 
partitionPathString);
+    checkDirExists(partitionPath);
+  }
+
+  private String getCreatePaimonSimpleTableString(String tableName) {
+    return String.format(
+        "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', 
address STRING COMMENT '') USING paimon",
+        tableName);
+  }
+
+  private List<SparkTableInfo.SparkColumnInfo> getPaimonSimpleTableColumn() {
+    return Arrays.asList(
+        SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id 
comment"),
+        SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""),
+        SparkTableInfo.SparkColumnInfo.of("address", DataTypes.StringType, 
""));
+  }
+}
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
index 38b21ddf0..077936c29 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.spark.connector.ConnectorConstants;
 import org.apache.gravitino.spark.connector.hive.SparkHiveTable;
 import org.apache.gravitino.spark.connector.iceberg.SparkIcebergTable;
+import org.apache.gravitino.spark.connector.paimon.SparkPaimonTable;
 import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -71,6 +72,10 @@ public class SparkTableInfo {
     return tableProperties.get(TableCatalog.PROP_LOCATION);
   }
 
+  public Map<String, String> getTableProperties() {
+    return tableProperties;
+  }
+
   // Include database name and table name
   public String getTableIdentifier() {
     if (StringUtils.isNotBlank(database)) {
@@ -186,6 +191,8 @@ public class SparkTableInfo {
       return ((SparkHiveTable) baseTable).schema();
     } else if (baseTable instanceof SparkIcebergTable) {
       return ((SparkIcebergTable) baseTable).schema();
+    } else if (baseTable instanceof SparkPaimonTable) {
+      return ((SparkPaimonTable) baseTable).schema();
     } else {
       throw new IllegalArgumentException(
           "Doesn't support Spark table: " + baseTable.getClass().getName());
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 646f41484..ed7d2085f 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -74,10 +74,13 @@ public abstract class SparkUtilIT extends BaseIT {
 
   // Specify Location explicitly because the default location is local HDFS, 
Spark will expand the
   // location to HDFS.
-  protected void createDatabaseIfNotExists(String database) {
-    sql(
-        String.format(
-            "CREATE DATABASE IF NOT EXISTS %s LOCATION '/user/hive/%s'", 
database, database));
+  // However, Paimon does not support create a database with a specified 
location.
+  protected void createDatabaseIfNotExists(String database, String provider) {
+    String locationClause =
+        "lakehouse-paimon".equalsIgnoreCase(provider)
+            ? ""
+            : String.format("LOCATION '/user/hive/%s'", database);
+    sql(String.format("CREATE DATABASE IF NOT EXISTS %s %s", database, 
locationClause));
   }
 
   protected Map<String, String> getDatabaseMetadata(String database) {
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
new file mode 100644
index 000000000..a3a0e9128
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPaimonPropertiesConverter {
+  private final PaimonPropertiesConverter paimonPropertiesConverter =
+      PaimonPropertiesConverter.getInstance();
+
+  @Test
+  void testCatalogPropertiesWithHiveBackend() {
+    Map<String, String> properties =
+        paimonPropertiesConverter.toSparkCatalogProperties(
+            ImmutableMap.of(
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+                
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE,
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI,
+                "hive-uri",
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+                "hive-warehouse",
+                "key1",
+                "value1"));
+    Assertions.assertEquals(
+        ImmutableMap.of(
+            PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+            PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_HIVE,
+            PaimonPropertiesConstants.PAIMON_CATALOG_URI,
+            "hive-uri",
+            PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+            "hive-warehouse"),
+        properties);
+  }
+
+  @Test
+  void testCatalogPropertiesWithJdbcBackend() {
+    Map<String, String> properties =
+        paimonPropertiesConverter.toSparkCatalogProperties(
+            ImmutableMap.of(
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+                PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC,
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI,
+                "jdbc-uri",
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+                "jdbc-warehouse",
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_USER,
+                "user",
+                
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD,
+                "passwd",
+                "key1",
+                "value1"));
+    Assertions.assertEquals(
+        ImmutableMap.of(
+            PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+            PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC,
+            PaimonPropertiesConstants.PAIMON_CATALOG_URI,
+            "jdbc-uri",
+            PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+            "jdbc-warehouse",
+            PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_USER,
+            "user",
+            PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_PASSWORD,
+            "passwd"),
+        properties);
+  }
+
+  @Test
+  void testCatalogPropertiesWithFilesystemBackend() {
+    Map<String, String> properties =
+        paimonPropertiesConverter.toSparkCatalogProperties(
+            ImmutableMap.of(
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+                
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM,
+                PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+                "filesystem-warehouse",
+                "key1",
+                "value1"));
+    Assertions.assertEquals(
+        ImmutableMap.of(
+            PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+            PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM,
+            PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+            "filesystem-warehouse"),
+        properties);
+  }
+}
diff --git a/spark-connector/v3.3/spark/build.gradle.kts 
b/spark-connector/v3.3/spark/build.gradle.kts
index c4c417d62..66c65f863 100644
--- a/spark-connector/v3.3/spark/build.gradle.kts
+++ b/spark-connector/v3.3/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] 
as? String ?: extr
 val sparkVersion: String = libs.versions.spark33.get()
 val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
 val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
 val kyuubiVersion: String = libs.versions.kyuubi4spark33.get()
 val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
 val scalaCollectionCompatVersion: String = 
libs.versions.scala.collection.compat.get()
@@ -43,6 +44,9 @@ dependencies {
     exclude("com.fasterxml.jackson")
   }
   
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+  
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") 
{
+    exclude("org.apache.spark")
+  }
 
   testImplementation(project(":api")) {
     exclude("org.apache.logging.log4j")
@@ -122,6 +126,9 @@ dependencies {
   testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+  
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
 {
+    exclude("org.apache.spark")
+  }
   
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
   // include spark-sql,spark-catalyst,hive-common,hdfs-client
   
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -134,6 +141,9 @@ dependencies {
     exclude("com.fasterxml.jackson.core")
   }
   
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+  
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
@@ -152,6 +162,7 @@ tasks.test {
     dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
     dependsOn(":catalogs:catalog-hive:jar")
     dependsOn(":iceberg:iceberg-rest-server:jar")
+    dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
   }
 }
 
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
similarity index 52%
copy from 
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
index 1b0af02f8..2fef911a8 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
@@ -16,20 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestCatalogNameAdaptor {
-  @Test
-  void testSpark33() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(), 
hiveCatalogName);
-
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), 
icebergCatalogName);
-  }
-}
+public class GravitinoPaimonCatalogSpark33 extends GravitinoPaimonCatalog {}
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
similarity index 58%
copy from 
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
index 5295e82fb..839b959c7 100644
--- 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
@@ -16,20 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestCatalogNameAdaptor {
+public class SparkPaimonCatalogFilesystemBackendIT33 extends 
SparkPaimonCatalogFilesystemBackendIT {
   @Test
-  void testSpark35() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), 
hiveCatalogName);
-
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), 
icebergCatalogName);
+  void testCatalogClassName() {
+    String catalogClass =
+        getSparkSession()
+            .sessionState()
+            .conf()
+            .getConfString("spark.sql.catalog." + getCatalogName());
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), 
catalogClass);
   }
 }
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index 1b0af02f8..37c95e478 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
 
 import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
 import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
 
     String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
     Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), 
icebergCatalogName);
+
+    String paimonCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(), 
paimonCatalogName);
   }
 }
diff --git a/spark-connector/v3.4/spark/build.gradle.kts 
b/spark-connector/v3.4/spark/build.gradle.kts
index f3308fca3..aa4134a3c 100644
--- a/spark-connector/v3.4/spark/build.gradle.kts
+++ b/spark-connector/v3.4/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] 
as? String ?: extr
 val sparkVersion: String = libs.versions.spark34.get()
 val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
 val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
 val kyuubiVersion: String = libs.versions.kyuubi4spark34.get()
 val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
 val scalaCollectionCompatVersion: String = 
libs.versions.scala.collection.compat.get()
@@ -44,6 +45,9 @@ dependencies {
   }
   compileOnly(project(":clients:client-java-runtime", configuration = 
"shadow"))
   
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+  
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") 
{
+    exclude("org.apache.spark")
+  }
 
   testImplementation(project(":api")) {
     exclude("org.apache.logging.log4j")
@@ -122,6 +126,9 @@ dependencies {
   testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+  
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
 {
+    exclude("org.apache.spark")
+  }
   
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
   // include spark-sql,spark-catalyst,hive-common,hdfs-client
   
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -134,6 +141,9 @@ dependencies {
     exclude("com.fasterxml.jackson.core")
   }
   
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+  
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
@@ -152,6 +162,7 @@ tasks.test {
     dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
     dependsOn(":catalogs:catalog-hive:jar")
     dependsOn(":iceberg:iceberg-rest-server:jar")
+    dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
   }
 }
 
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
similarity index 53%
copy from 
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
index 5295e82fb..eb3e87793 100644
--- 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
@@ -16,20 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
+import org.apache.gravitino.spark.connector.SparkTableChangeConverter34;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter34;
 
-public class TestCatalogNameAdaptor {
-  @Test
-  void testSpark35() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), 
hiveCatalogName);
+public class GravitinoPaimonCatalogSpark34 extends GravitinoPaimonCatalog {
+  @Override
+  protected SparkTypeConverter getSparkTypeConverter() {
+    return new SparkTypeConverter34();
+  }
 
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), 
icebergCatalogName);
+  @Override
+  protected SparkTableChangeConverter getSparkTableChangeConverter(
+      SparkTypeConverter sparkTypeConverter) {
+    return new SparkTableChangeConverter34(sparkTypeConverter);
   }
 }
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
similarity index 59%
copy from 
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
index 5295e82fb..d23070732 100644
--- 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
@@ -16,20 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestCatalogNameAdaptor {
-  @Test
-  void testSpark35() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), 
hiveCatalogName);
+public class SparkPaimonCatalogFilesystemBackendIT34 extends 
SparkPaimonCatalogFilesystemBackendIT {
 
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), 
icebergCatalogName);
+  @Test
+  void testCatalogClassName() {
+    String catalogClass =
+        getSparkSession()
+            .sessionState()
+            .conf()
+            .getConfString("spark.sql.catalog." + getCatalogName());
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(), 
catalogClass);
   }
 }
diff --git 
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index a2e95c8ea..af9e67fab 100644
--- 
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
 
 import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34;
 import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
 
     String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
     Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(), 
icebergCatalogName);
+
+    String paimonCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(), 
paimonCatalogName);
   }
 }
diff --git a/spark-connector/v3.5/spark/build.gradle.kts 
b/spark-connector/v3.5/spark/build.gradle.kts
index 7b8cc8447..15aa01808 100644
--- a/spark-connector/v3.5/spark/build.gradle.kts
+++ b/spark-connector/v3.5/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"] 
as? String ?: extr
 val sparkVersion: String = libs.versions.spark35.get()
 val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
 val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
 val kyuubiVersion: String = libs.versions.kyuubi4spark35.get()
 val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
 val scalaCollectionCompatVersion: String = 
libs.versions.scala.collection.compat.get()
@@ -45,6 +46,9 @@ dependencies {
   }
   compileOnly(project(":clients:client-java-runtime", configuration = 
"shadow"))
   
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+  
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") 
{
+    exclude("org.apache.spark")
+  }
 
   testImplementation(project(":api")) {
     exclude("org.apache.logging.log4j")
@@ -124,6 +128,9 @@ dependencies {
   testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+  
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
 {
+    exclude("org.apache.spark")
+  }
   
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
   // include spark-sql,spark-catalyst,hive-common,hdfs-client
   
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -136,6 +143,9 @@ dependencies {
     exclude("com.fasterxml.jackson.core")
   }
   
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+  
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+  testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
@@ -154,6 +164,7 @@ tasks.test {
     dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
     dependsOn(":catalogs:catalog-hive:jar")
     dependsOn(":iceberg:iceberg-rest-server:jar")
+    dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
   }
 }
 
diff --git 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
similarity index 52%
copy from 
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
index 1b0af02f8..2c39af5b2 100644
--- 
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
@@ -16,20 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestCatalogNameAdaptor {
-  @Test
-  void testSpark33() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(), 
hiveCatalogName);
-
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), 
icebergCatalogName);
-  }
-}
+public class GravitinoPaimonCatalogSpark35 extends 
GravitinoPaimonCatalogSpark34 {}
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
similarity index 59%
copy from 
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to 
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
index 5295e82fb..44281c76e 100644
--- 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
@@ -16,20 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
 
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class TestCatalogNameAdaptor {
-  @Test
-  void testSpark35() {
-    String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
-    Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), 
hiveCatalogName);
+public class SparkPaimonCatalogFilesystemBackendIT35 extends 
SparkPaimonCatalogFilesystemBackendIT {
 
-    String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
-    Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), 
icebergCatalogName);
+  @Test
+  void testCatalogClassName() {
+    String catalogClass =
+        getSparkSession()
+            .sessionState()
+            .conf()
+            .getConfString("spark.sql.catalog." + getCatalogName());
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(), 
catalogClass);
   }
 }
diff --git 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index 5295e82fb..f02584cd6 100644
--- 
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++ 
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
 
 import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
 import 
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import 
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
 
     String icebergCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
     Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), 
icebergCatalogName);
+
+    String paimonCatalogName = 
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+    Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(), 
paimonCatalogName);
   }
 }

Reply via email to