This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 873217554 [#4722] feat(paimon-spark-connector): support schema and
table DDL and table DML for GravitinoPaimonCatalog in paimon spark connector
(#5722)
873217554 is described below
commit 873217554c7022f191013605133c43872e8e5bf6
Author: cai can <[email protected]>
AuthorDate: Mon Dec 16 11:36:10 2024 +0800
[#4722] feat(paimon-spark-connector): support schema and table DDL and
table DML for GravitinoPaimonCatalog in paimon spark connector (#5722)
### What changes were proposed in this pull request?
support schema and table DDL and table DML for GravitinoPaimonCatalog in
paimon spark connector.
### Why are the changes needed?
Fix:
https://github.com/apache/gravitino/issues/4722
https://github.com/apache/gravitino/issues/4717
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
new Its and UTs.
---------
Co-authored-by: caican <[email protected]>
---
.../catalog/lakehouse/paimon/PaimonConstants.java | 57 ++++++++++
.../lakehouse/paimon/PaimonPropertiesUtils.java | 95 ++++++++++++++++
.../paimon/PaimonCatalogPropertiesMetadata.java | 26 ++---
.../paimon/PaimonSchemaPropertiesMetadata.java | 2 +-
.../paimon/PaimonTablePropertiesMetadata.java | 16 +--
.../paimon/storage/PaimonOSSFileSystemConfig.java | 7 +-
.../paimon/storage/PaimonS3FileSystemConfig.java | 7 +-
docs/lakehouse-paimon-catalog.md | 35 +++---
spark-connector/spark-common/build.gradle.kts | 10 ++
.../spark/connector/catalog/BaseCatalog.java | 4 +-
.../connector/paimon/GravitinoPaimonCatalog.java | 84 +++++++++++++++
.../paimon/PaimonPropertiesConstants.java | 51 +++++++++
.../paimon/PaimonPropertiesConverter.java | 67 ++++++++++++
.../spark/connector/paimon/SparkPaimonTable.java | 88 +++++++++++++++
.../connector/version/CatalogNameAdaptor.java | 21 ++--
.../connector/integration/test/SparkCommonIT.java | 20 ++--
.../integration/test/hive/SparkHiveCatalogIT.java | 5 +
.../test/iceberg/SparkIcebergCatalogIT.java | 5 +
.../SparkPaimonCatalogFilesystemBackendIT.java | 71 ++++++++++++
.../test/paimon/SparkPaimonCatalogIT.java | 119 +++++++++++++++++++++
.../integration/test/util/SparkTableInfo.java | 7 ++
.../integration/test/util/SparkUtilIT.java | 11 +-
.../paimon/TestPaimonPropertiesConverter.java | 106 ++++++++++++++++++
spark-connector/v3.3/spark/build.gradle.kts | 11 ++
.../paimon/GravitinoPaimonCatalogSpark33.java} | 18 +---
.../SparkPaimonCatalogFilesystemBackendIT33.java} | 20 ++--
.../connector/version/TestCatalogNameAdaptor.java | 4 +
spark-connector/v3.4/spark/build.gradle.kts | 11 ++
.../paimon/GravitinoPaimonCatalogSpark34.java} | 26 ++---
.../SparkPaimonCatalogFilesystemBackendIT34.java} | 21 ++--
.../connector/version/TestCatalogNameAdaptor.java | 4 +
spark-connector/v3.5/spark/build.gradle.kts | 11 ++
.../paimon/GravitinoPaimonCatalogSpark35.java} | 18 +---
.../SparkPaimonCatalogFilesystemBackendIT35.java} | 21 ++--
.../connector/version/TestCatalogNameAdaptor.java | 4 +
35 files changed, 945 insertions(+), 138 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
new file mode 100644
index 000000000..291a7ea96
--- /dev/null
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon;
+
+public class PaimonConstants {
+
+ // Paimon catalog properties constants
+ public static final String CATALOG_BACKEND = "catalog-backend";
+ public static final String METASTORE = "metastore";
+ public static final String URI = "uri";
+ public static final String WAREHOUSE = "warehouse";
+ public static final String CATALOG_BACKEND_NAME = "catalog-backend-name";
+
+ public static final String GRAVITINO_JDBC_USER = "jdbc-user";
+ public static final String PAIMON_JDBC_USER = "jdbc.user";
+
+ public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
+ public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";
+
+ public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
+
+ // S3 properties needed by Paimon
+ public static final String S3_ENDPOINT = "s3.endpoint";
+ public static final String S3_ACCESS_KEY = "s3.access-key";
+ public static final String S3_SECRET_KEY = "s3.secret-key";
+
+ // OSS related properties
+ public static final String OSS_ENDPOINT = "fs.oss.endpoint";
+ public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
+ public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+
+ // Iceberg Table properties constants
+ public static final String COMMENT = "comment";
+ public static final String OWNER = "owner";
+ public static final String BUCKET_KEY = "bucket-key";
+ public static final String MERGE_ENGINE = "merge-engine";
+ public static final String SEQUENCE_FIELD = "sequence.field";
+ public static final String ROWKIND_FIELD = "rowkind.field";
+ public static final String PRIMARY_KEY = "primary-key";
+ public static final String PARTITION = "partition";
+}
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
new file mode 100644
index 000000000..0dcf24f3a
--- /dev/null
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.gravitino.storage.S3Properties;
+
+public class PaimonPropertiesUtils {
+
+ // Map that maintains the mapping of keys in Gravitino to that in Paimon,
for example, users
+ // will only need to set the configuration 'catalog-backend' in Gravitino
and Gravitino will
+ // change it to `catalogType` automatically and pass it to Paimon.
+ public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON;
+
+ static {
+ Map<String, String> map = new HashMap();
+ map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND);
+ map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER,
PaimonConstants.GRAVITINO_JDBC_DRIVER);
+ map.put(PaimonConstants.GRAVITINO_JDBC_USER,
PaimonConstants.PAIMON_JDBC_USER);
+ map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD,
PaimonConstants.PAIMON_JDBC_PASSWORD);
+ map.put(PaimonConstants.URI, PaimonConstants.URI);
+ map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE);
+ map.put(PaimonConstants.CATALOG_BACKEND_NAME,
PaimonConstants.CATALOG_BACKEND_NAME);
+ // S3
+ map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT);
+ map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID,
PaimonConstants.S3_ACCESS_KEY);
+ map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
PaimonConstants.S3_SECRET_KEY);
+ // OSS
+ map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT,
PaimonConstants.OSS_ENDPOINT);
+ map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID,
PaimonConstants.OSS_ACCESS_KEY);
+ map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
PaimonConstants.OSS_SECRET_KEY);
+ GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map);
+ }
+
+ /**
+ * Converts Gravitino properties to Paimon catalog properties, the common
transform logic shared
+ * by Spark connector, Gravitino Paimon catalog.
+ *
+ * @param gravitinoProperties a map of Gravitino configuration properties.
+ * @return a map containing Paimon catalog properties.
+ */
+ public static Map<String, String> toPaimonCatalogProperties(
+ Map<String, String> gravitinoProperties) {
+ Map<String, String> paimonProperties = new HashMap<>();
+ gravitinoProperties.forEach(
+ (key, value) -> {
+ if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) {
+ paimonProperties.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value);
+ }
+ });
+ return paimonProperties;
+ }
+
+ /**
+ * Get catalog backend name from Gravitino catalog properties.
+ *
+ * @param catalogProperties a map of Gravitino catalog properties.
+ * @return catalog backend name.
+ */
+ public static String getCatalogBackendName(Map<String, String>
catalogProperties) {
+ String backendName =
catalogProperties.get(PaimonConstants.CATALOG_BACKEND_NAME);
+ if (backendName != null) {
+ return backendName;
+ }
+
+ String catalogBackend =
catalogProperties.get(PaimonConstants.CATALOG_BACKEND);
+ return Optional.ofNullable(catalogBackend)
+ .map(s -> s.toLowerCase(Locale.ROOT))
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ String.format("Unsupported catalog backend: %s",
catalogBackend)));
+ }
+}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
index e3b59bff3..4c9dcb07a 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java
@@ -45,20 +45,22 @@ import org.apache.gravitino.storage.S3Properties;
*/
public class PaimonCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
- @VisibleForTesting public static final String GRAVITINO_CATALOG_BACKEND =
"catalog-backend";
- public static final String PAIMON_METASTORE = "metastore";
- public static final String WAREHOUSE = "warehouse";
- public static final String URI = "uri";
- public static final String GRAVITINO_JDBC_USER = "jdbc-user";
- public static final String PAIMON_JDBC_USER = "jdbc.user";
- public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
- public static final String PAIMON_JDBC_PASSWORD = "jdbc.password";
- public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
+ @VisibleForTesting
+ public static final String GRAVITINO_CATALOG_BACKEND =
PaimonConstants.CATALOG_BACKEND;
+
+ public static final String PAIMON_METASTORE = PaimonConstants.METASTORE;
+ public static final String WAREHOUSE = PaimonConstants.WAREHOUSE;
+ public static final String URI = PaimonConstants.URI;
+ public static final String GRAVITINO_JDBC_USER =
PaimonConstants.GRAVITINO_JDBC_USER;
+ public static final String PAIMON_JDBC_USER =
PaimonConstants.PAIMON_JDBC_USER;
+ public static final String GRAVITINO_JDBC_PASSWORD =
PaimonConstants.GRAVITINO_JDBC_PASSWORD;
+ public static final String PAIMON_JDBC_PASSWORD =
PaimonConstants.PAIMON_JDBC_PASSWORD;
+ public static final String GRAVITINO_JDBC_DRIVER =
PaimonConstants.GRAVITINO_JDBC_DRIVER;
// S3 properties needed by Paimon
- public static final String S3_ENDPOINT = "s3.endpoint";
- public static final String S3_ACCESS_KEY = "s3.access-key";
- public static final String S3_SECRET_KEY = "s3.secret-key";
+ public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
+ public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
+ public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;
public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
index 9a6ddb5a1..3da05099c 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java
@@ -34,7 +34,7 @@ import org.apache.gravitino.connector.PropertyEntry;
*/
public class PaimonSchemaPropertiesMetadata extends BasePropertiesMetadata {
- public static final String COMMENT = "comment";
+ public static final String COMMENT = PaimonConstants.COMMENT;
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
index 671dd9d66..ad63df678 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
@@ -35,14 +35,14 @@ import org.apache.gravitino.connector.PropertyEntry;
*/
public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {
- public static final String COMMENT = "comment";
- public static final String OWNER = "owner";
- public static final String BUCKET_KEY = "bucket-key";
- public static final String MERGE_ENGINE = "merge-engine";
- public static final String SEQUENCE_FIELD = "sequence.field";
- public static final String ROWKIND_FIELD = "rowkind.field";
- public static final String PRIMARY_KEY = "primary-key";
- public static final String PARTITION = "partition";
+ public static final String COMMENT = PaimonConstants.COMMENT;
+ public static final String OWNER = PaimonConstants.OWNER;
+ public static final String BUCKET_KEY = PaimonConstants.BUCKET_KEY;
+ public static final String MERGE_ENGINE = PaimonConstants.MERGE_ENGINE;
+ public static final String SEQUENCE_FIELD = PaimonConstants.SEQUENCE_FIELD;
+ public static final String ROWKIND_FIELD = PaimonConstants.ROWKIND_FIELD;
+ public static final String PRIMARY_KEY = PaimonConstants.PRIMARY_KEY;
+ public static final String PARTITION = PaimonConstants.PARTITION;
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
index ad7fa26f3..7b703b5b7 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonOSSFileSystemConfig.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
@@ -29,9 +30,9 @@ import org.apache.gravitino.connector.PropertyEntry;
public class PaimonOSSFileSystemConfig extends Config {
// OSS related properties
- public static final String OSS_ENDPOINT = "fs.oss.endpoint";
- public static final String OSS_ACCESS_KEY = "fs.oss.accessKeyId";
- public static final String OSS_SECRET_KEY = "fs.oss.accessKeySecret";
+ public static final String OSS_ENDPOINT = PaimonConstants.OSS_ENDPOINT;
+ public static final String OSS_ACCESS_KEY = PaimonConstants.OSS_ACCESS_KEY;
+ public static final String OSS_SECRET_KEY = PaimonConstants.OSS_SECRET_KEY;
public PaimonOSSFileSystemConfig(Map<String, String> properties) {
super(false);
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
index 4184fcc06..6588e4a52 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/storage/PaimonS3FileSystemConfig.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
@@ -29,9 +30,9 @@ import org.apache.gravitino.connector.PropertyEntry;
public class PaimonS3FileSystemConfig extends Config {
// S3 related properties
- public static final String S3_ENDPOINT = "s3.endpoint";
- public static final String S3_ACCESS_KEY = "s3.access-key";
- public static final String S3_SECRET_KEY = "s3.secret-key";
+ public static final String S3_ENDPOINT = PaimonConstants.S3_ENDPOINT;
+ public static final String S3_ACCESS_KEY = PaimonConstants.S3_ACCESS_KEY;
+ public static final String S3_SECRET_KEY = PaimonConstants.S3_SECRET_KEY;
public PaimonS3FileSystemConfig(Map<String, String> properties) {
super(false);
diff --git a/docs/lakehouse-paimon-catalog.md b/docs/lakehouse-paimon-catalog.md
index d53ad4827..b67fe37db 100644
--- a/docs/lakehouse-paimon-catalog.md
+++ b/docs/lakehouse-paimon-catalog.md
@@ -29,23 +29,24 @@ Builds with Apache Paimon `0.8.0`.
### Catalog properties
-| Property name | Description
| Default value | Required
| Since Version |
-|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
-| `catalog-backend` | Catalog backend of
Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`.
| (none) | Yes
| 0.6.0-incubating |
-| `uri` | The URI configuration
of the Paimon catalog. `thrift://127.0.0.1:9083` or
`jdbc:postgresql://127.0.0.1:5432/db_name` or
`jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for
`FilesystemCatalog`. | (none) | required if the value of
`catalog-backend` is not `filesystem`.
| 0.6.0-incubating
|
-| `warehouse` | Warehouse directory of
catalog. `file:///user/hive/warehouse-paimon/` for local fs,
`hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or
`oss://{bucket-name}/path` for Aliyun OSS | (none) | Yes
|
0.6.0-incubating |
-| `authentication.type` | The type of
authentication for Paimon catalog backend, currently Gravitino only supports
`Kerberos` and `simple`.
| `simple` | No
| 0.6.0-incubating |
-| `hive.metastore.sasl.enabled` | Whether to enable SASL
authentication protocol when connect to Kerberos Hive metastore. This is a raw
Hive configuration
| `false` | No, This value should be true in most
case(Some will use SSL protocol, but it rather rare) if the value of
`gravitino.iceberg-rest.authentication.type` is Kerberos. | 0.6.0-incubating |
-| `authentication.kerberos.principal` | The principal of the
Kerberos authentication.
| (none) | required if the value of
`authentication.type` is Kerberos.
| 0.6.0-incubating
|
-| `authentication.kerberos.keytab-uri` | The URI of The keytab
for the Kerberos authentication.
| (none) | required if the value of
`authentication.type` is Kerberos.
| 0.6.0-incubating
|
-| `authentication.kerberos.check-interval-sec` | The check interval of
Kerberos credential for Paimon catalog.
| 60 | No
| 0.6.0-incubating |
-| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.
| 60 | No
| 0.6.0-incubating |
-| `oss-endpoint` | The endpoint of the
Aliyun OSS.
| (none) | required if the value of `warehouse`
is a OSS path
| 0.7.0-incubating |
-| `oss-access-key-id` | The access key of the
Aliyun OSS.
| (none) | required if the value of `warehouse` is
a OSS path
| 0.7.0-incubating |
-| `oss-accesss-key-secret` | The secret key the
Aliyun OSS.
| (none) | required if the value of `warehouse`
is a OSS path
| 0.7.0-incubating |
-| `s3-endpoint` | The endpoint of the AWS
S3.
| (none) | required if the value of `warehouse` is a
S3 path
| 0.7.0-incubating |
-| `s3-access-key-id` | The access key of the
AWS S3.
| (none) | required if the value of `warehouse` is
a S3 path
| 0.7.0-incubating |
-| `s3-secret-access-key` | The secret key of the
AWS S3.
| (none) | required if the value of `warehouse` is
a S3 path
| 0.7.0-incubating |
+| Property name | Description
| Default value
| Required
[...]
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| `catalog-backend` | Catalog backend of
Gravitino Paimon catalog. Supports `filesystem`, `jdbc` and `hive`.
| (none)
| Yes
[...]
+| `uri` | The URI configuration
of the Paimon catalog. `thrift://127.0.0.1:9083` or
`jdbc:postgresql://127.0.0.1:5432/db_name` or
`jdbc:mysql://127.0.0.1:3306/metastore_db`. It is optional for
`FilesystemCatalog`. | (none)
| required if the value of `catalog-backend` is not
`filesystem`.
[...]
+| `warehouse` | Warehouse directory of
catalog. `file:///user/hive/warehouse-paimon/` for local fs,
`hdfs://namespace/hdfs/path` for HDFS , `s3://{bucket-name}/path/` for S3 or
`oss://{bucket-name}/path` for Aliyun OSS | (none)
| Yes
[...]
+| `catalog-backend-name` | The catalog name passed
to underlying Paimon catalog backend.
| The property value of `catalog-backend`, like `jdbc` for
JDBC catalog backend. | No
[...]
+| `authentication.type` | The type of
authentication for Paimon catalog backend, currently Gravitino only supports
`Kerberos` and `simple`.
| `simple`
| No
[...]
+| `hive.metastore.sasl.enabled` | Whether to enable SASL
authentication protocol when connect to Kerberos Hive metastore. This is a raw
Hive configuration
| `false`
| No, This value should be true in most case(Some will
use SSL protocol, but it rather rare) if the value of
`gravitino.iceberg-rest.authentication.type [...]
+| `authentication.kerberos.principal` | The principal of the
Kerberos authentication.
| (none)
| required if the value of `authentication.type` is
Kerberos.
[...]
+| `authentication.kerberos.keytab-uri` | The URI of The keytab
for the Kerberos authentication.
| (none)
| required if the value of `authentication.type` is
Kerberos.
[...]
+| `authentication.kerberos.check-interval-sec` | The check interval of
Kerberos credential for Paimon catalog.
| 60
| No
[...]
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.
| 60
| No
[...]
+| `oss-endpoint` | The endpoint of the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
+| `oss-access-key-id` | The access key of the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
+| `oss-accesss-key-secret` | The secret key the
Aliyun OSS.
| (none)
| required if the value of `warehouse` is a OSS path
[...]
+| `s3-endpoint` | The endpoint of the AWS
S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
+| `s3-access-key-id` | The access key of the
AWS S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
+| `s3-secret-access-key` | The secret key of the
AWS S3.
| (none)
| required if the value of `warehouse` is a S3 path
[...]
:::note
If you want to use the `oss` or `s3` warehouse, you need to place related jars
in the `catalogs/lakehouse-paimon/lib` directory, more information can be found
in the [Paimon S3](https://paimon.apache.org/docs/master/filesystems/s3/).
diff --git a/spark-connector/spark-common/build.gradle.kts
b/spark-connector/spark-common/build.gradle.kts
index 7f3c66aa6..06e0077d2 100644
--- a/spark-connector/spark-common/build.gradle.kts
+++ b/spark-connector/spark-common/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"]
as? String ?: extr
val sparkVersion: String = libs.versions.spark33.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
// kyuubi hive connector for Spark 3.3 doesn't support scala 2.13
val kyuubiVersion: String = libs.versions.kyuubi4spark34.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
@@ -43,6 +44,9 @@ dependencies {
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
@@ -114,6 +118,9 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -123,6 +130,9 @@ dependencies {
exclude("org.glassfish.jersey.inject")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 2201bd222..5706895ca 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -76,11 +76,11 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces {
protected TableCatalog sparkCatalog;
protected PropertiesConverter propertiesConverter;
protected SparkTransformConverter sparkTransformConverter;
+ // The Gravitino catalog client to do schema operations.
+ protected Catalog gravitinoCatalogClient;
private SparkTypeConverter sparkTypeConverter;
private SparkTableChangeConverter sparkTableChangeConverter;
- // The Gravitino catalog client to do schema operations.
- private Catalog gravitinoCatalogClient;
private String catalogName;
private final GravitinoCatalogManager gravitinoCatalogManager;
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
new file mode 100644
index 000000000..86ca680c4
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+import org.apache.gravitino.spark.connector.SparkTransformConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.catalog.BaseCatalog;
+import org.apache.paimon.spark.SparkCatalog;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class GravitinoPaimonCatalog extends BaseCatalog {
+
+ @Override
+ protected TableCatalog createAndInitSparkCatalog(
+ String name, CaseInsensitiveStringMap options, Map<String, String>
properties) {
+ String catalogBackendName =
PaimonPropertiesUtils.getCatalogBackendName(properties);
+ TableCatalog paimonCatalog = new SparkCatalog();
+ Map<String, String> all =
+ getPropertiesConverter().toSparkCatalogProperties(options, properties);
+ paimonCatalog.initialize(catalogBackendName, new
CaseInsensitiveStringMap(all));
+ return paimonCatalog;
+ }
+
+ @Override
+ protected Table createSparkTable(
+ Identifier identifier,
+ org.apache.gravitino.rel.Table gravitinoTable,
+ Table sparkTable,
+ TableCatalog sparkCatalog,
+ PropertiesConverter propertiesConverter,
+ SparkTransformConverter sparkTransformConverter,
+ SparkTypeConverter sparkTypeConverter) {
+ return new SparkPaimonTable(
+ identifier,
+ gravitinoTable,
+ (SparkTable) sparkTable,
+ propertiesConverter,
+ sparkTransformConverter,
+ sparkTypeConverter);
+ }
+
+ @Override
+ protected PropertiesConverter getPropertiesConverter() {
+ return PaimonPropertiesConverter.getInstance();
+ }
+
+ @Override
+ protected SparkTransformConverter getSparkTransformConverter() {
+ return new SparkTransformConverter(true);
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ sparkCatalog.invalidateTable(ident);
+ return gravitinoCatalogClient
+ .asTableCatalog()
+ .purgeTable(NameIdentifier.of(getDatabase(ident), ident.name()));
+ }
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
new file mode 100644
index 000000000..915308ae8
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+
+public class PaimonPropertiesConstants {
+
+ public static final String GRAVITINO_PAIMON_CATALOG_BACKEND =
PaimonConstants.CATALOG_BACKEND;
+ static final String PAIMON_CATALOG_METASTORE = PaimonConstants.METASTORE;
+
+ public static final String GRAVITINO_PAIMON_CATALOG_WAREHOUSE =
PaimonConstants.WAREHOUSE;
+ static final String PAIMON_CATALOG_WAREHOUSE = PaimonConstants.WAREHOUSE;
+
+ public static final String GRAVITINO_PAIMON_CATALOG_URI =
PaimonConstants.URI;
+ static final String PAIMON_CATALOG_URI = PaimonConstants.URI;
+ static final String GRAVITINO_PAIMON_CATALOG_JDBC_USER =
PaimonConstants.GRAVITINO_JDBC_USER;
+ static final String PAIMON_CATALOG_JDBC_USER =
PaimonConstants.PAIMON_JDBC_USER;
+
+ static final String GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD =
+ PaimonConstants.GRAVITINO_JDBC_PASSWORD;
+ static final String PAIMON_CATALOG_JDBC_PASSWORD =
PaimonConstants.PAIMON_JDBC_PASSWORD;
+
+ public static final String PAIMON_CATALOG_BACKEND_HIVE = "hive";
+ static final String GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE = "hive";
+
+ static final String GRAVITINO_PAIMON_CATALOG_BACKEND_JDBC = "jdbc";
+ static final String PAIMON_CATALOG_BACKEND_JDBC = "jdbc";
+
+ public static final String PAIMON_CATALOG_BACKEND_FILESYSTEM = "filesystem";
+ static final String GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM =
"filesystem";
+
+ public static final String PAIMON_TABLE_LOCATION = "path";
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
new file mode 100644
index 000000000..f713ca89d
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
+import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+
+public class PaimonPropertiesConverter implements PropertiesConverter {
+
+ public static class PaimonPropertiesConverterHolder {
+ private static final PaimonPropertiesConverter INSTANCE = new
PaimonPropertiesConverter();
+ }
+
+ private PaimonPropertiesConverter() {}
+
+ public static PaimonPropertiesConverter getInstance() {
+ return PaimonPropertiesConverter.PaimonPropertiesConverterHolder.INSTANCE;
+ }
+
+ @Override
+ public Map<String, String> toSparkCatalogProperties(Map<String, String>
properties) {
+ Preconditions.checkArgument(properties != null, "Paimon Catalog properties
should not be null");
+ Map<String, String> all =
PaimonPropertiesUtils.toPaimonCatalogProperties(properties);
+ String catalogBackend =
all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(catalogBackend),
+ String.format(
+ "%s should not be empty",
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND));
+ all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
catalogBackend);
+ return all;
+ }
+
+ @Override
+ public Map<String, String> toGravitinoTableProperties(Map<String, String>
properties) {
+ HashMap<String, String> gravitinoTableProperties = new
HashMap<>(properties);
+ // The owner property of Paimon is a reserved property, so we need to
remove it.
+ gravitinoTableProperties.remove(PaimonConstants.OWNER);
+ return gravitinoTableProperties;
+ }
+
+ @Override
+ public Map<String, String> toSparkTableProperties(Map<String, String>
properties) {
+ return new HashMap<>(properties);
+ }
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
new file mode 100644
index 000000000..f1db29b71
--- /dev/null
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/SparkPaimonTable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import java.util.Map;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.spark.connector.PropertiesConverter;
+import org.apache.gravitino.spark.connector.SparkTransformConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
+import org.apache.paimon.spark.SparkTable;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkPaimonTable extends SparkTable {
+
+ private GravitinoTableInfoHelper gravitinoTableInfoHelper;
+ private org.apache.spark.sql.connector.catalog.Table sparkTable;
+
+ public SparkPaimonTable(
+ Identifier identifier,
+ Table gravitinoTable,
+ SparkTable sparkTable,
+ PropertiesConverter propertiesConverter,
+ SparkTransformConverter sparkTransformConverter,
+ SparkTypeConverter sparkTypeConverter) {
+ super(sparkTable.getTable());
+ this.gravitinoTableInfoHelper =
+ new GravitinoTableInfoHelper(
+ true,
+ identifier,
+ gravitinoTable,
+ propertiesConverter,
+ sparkTransformConverter,
+ sparkTypeConverter);
+ this.sparkTable = sparkTable;
+ }
+
+ @Override
+ public String name() {
+ return gravitinoTableInfoHelper.name();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public StructType schema() {
+ return gravitinoTableInfoHelper.schema();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return gravitinoTableInfoHelper.properties();
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return gravitinoTableInfoHelper.partitioning();
+ }
+
+ /**
+ * If using SparkPaimonTable not SparkTable, we must extract snapshotId or
branchName using the
+ * Paimon specific logic. It's hard to maintenance.
+ */
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return ((SparkTable) sparkTable).newScanBuilder(options);
+ }
+}
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
index 8141c799b..9392feac2 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java
@@ -27,15 +27,24 @@ import org.apache.spark.util.VersionUtils$;
public class CatalogNameAdaptor {
private static final Map<String, String> catalogNames =
ImmutableMap.of(
- "hive-3.3",
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33",
- "hive-3.4",
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34",
- "hive-3.5",
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35",
+ "hive-3.3",
+
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33",
+ "hive-3.4",
+
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34",
+ "hive-3.5",
+
"org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35",
"lakehouse-iceberg-3.3",
-
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33",
+
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33",
"lakehouse-iceberg-3.4",
-
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34",
+
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34",
"lakehouse-iceberg-3.5",
-
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35");
+
"org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35",
+ "lakehouse-paimon-3.3",
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33",
+ "lakehouse-paimon-3.4",
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34",
+ "lakehouse-paimon-3.5",
+
"org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35");
private static String sparkVersion() {
return package$.MODULE$.SPARK_VERSION();
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index 63e4801ef..c7517a3bf 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -117,6 +117,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
protected abstract boolean supportsSchemaEvolution();
+ protected abstract boolean supportsReplaceColumns();
+
// Use a custom database not the original default database because
SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default
database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is
local HDFS address
@@ -146,7 +148,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
throw e;
}
sql("USE " + getCatalogName());
- createDatabaseIfNotExists(getDefaultDatabase());
+ createDatabaseIfNotExists(getDefaultDatabase(), getProvider());
}
@BeforeEach
@@ -187,7 +189,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
}
@Test
- void testCreateAndLoadSchema() {
+ protected void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES
(ID=001);");
@@ -216,7 +218,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
}
@Test
- void testAlterSchema() {
+ protected void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES
(ID=001);");
@@ -240,6 +242,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
@Test
void testDropSchema() {
String testDatabaseName = "t_drop";
+ dropDatabaseIfExists(testDatabaseName);
Set<String> databases = getDatabases();
Assertions.assertFalse(databases.contains(testDatabaseName));
@@ -277,7 +280,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
// test db.table as table identifier
String databaseName = "db1";
String tableName = "table1";
- createDatabaseIfNotExists(databaseName);
+ createDatabaseIfNotExists(databaseName, getProvider());
String tableIdentifier = String.join(".", databaseName, tableName);
dropTableIfExists(tableIdentifier);
@@ -291,7 +294,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
// use db then create table with table name
databaseName = "db2";
tableName = "table2";
- createDatabaseIfNotExists(databaseName);
+ createDatabaseIfNotExists(databaseName, getProvider());
sql("USE " + databaseName);
dropTableIfExists(tableName);
@@ -379,7 +382,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
String database = "db_list";
String table3 = "list3";
String table4 = "list4";
- createDatabaseIfNotExists(database);
+ createDatabaseIfNotExists(database, getProvider());
dropTableIfExists(String.join(".", database, table3));
dropTableIfExists(String.join(".", database, table4));
createSimpleTable(String.join(".", database, table3));
@@ -550,7 +553,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
}
@Test
- void testAlterTableReplaceColumns() {
+ @EnabledIf("supportsReplaceColumns")
+ protected void testAlterTableReplaceColumns() {
String tableName = "test_replace_columns_table";
dropTableIfExists(tableName);
@@ -563,7 +567,7 @@ public abstract class SparkCommonIT extends SparkEnvIT {
sql(
String.format(
- "ALTER TABLE %S REPLACE COLUMNS (id int COMMENT 'new comment',
name2 string, age long);",
+ "ALTER TABLE %s REPLACE COLUMNS (id int COMMENT 'new comment',
name2 string, age long);",
tableName));
ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>();
// change comment for id
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index c543d8281..b95882a0d 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -79,6 +79,11 @@ public abstract class SparkHiveCatalogIT extends
SparkCommonIT {
return false;
}
+ @Override
+ protected boolean supportsReplaceColumns() {
+ return true;
+ }
+
@Test
void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 52f4abf3a..f5fd337a1 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -104,6 +104,11 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
return true;
}
+ @Override
+ protected boolean supportsReplaceColumns() {
+ return true;
+ }
+
@Override
protected String getTableLocation(SparkTableInfo table) {
return String.join(File.separator, table.getTableLocation(), "data");
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
new file mode 100644
index 000000000..3d4a3257a
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.paimon;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/** This class use Apache Paimon FilesystemCatalog for backend catalog. */
+@Tag("gravitino-docker-test")
+public abstract class SparkPaimonCatalogFilesystemBackendIT extends
SparkPaimonCatalogIT {
+
+ @Override
+ protected Map<String, String> getCatalogConfigs() {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+ catalogProperties.put(
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+ PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM);
+
catalogProperties.put(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
warehouse);
+ return catalogProperties;
+ }
+
+ @Test
+ @Override
+ protected void testCreateAndLoadSchema() {
+ String testDatabaseName = "t_create1";
+ dropDatabaseIfExists(testDatabaseName);
+ sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES
(ID=001);");
+ Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
+ // The database of the Paimon filesystem backend do not store any
properties.
+ Assertions.assertFalse(databaseMeta.containsKey("ID"));
+ }
+
+ @Test
+ @Override
+ protected void testAlterSchema() {
+ String testDatabaseName = "t_alter";
+ dropDatabaseIfExists(testDatabaseName);
+ sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES
(ID=001);");
+ Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
+ // The database of the Paimon filesystem backend do not store any
properties.
+ Assertions.assertFalse(databaseMeta.containsKey("ID"));
+
+ // The Paimon filesystem backend do not support alter database operation.
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ sql(
+ String.format(
+ "ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')",
testDatabaseName)));
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
new file mode 100644
index 000000000..c77a4642e
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.spark.connector.integration.test.paimon;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.spark.connector.integration.test.SparkCommonIT;
+import
org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfo;
+import
org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker;
+import org.apache.gravitino.spark.connector.paimon.PaimonPropertiesConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public abstract class SparkPaimonCatalogIT extends SparkCommonIT {
+
+ @Override
+ protected String getCatalogName() {
+ return "paimon";
+ }
+
+ @Override
+ protected String getProvider() {
+ return "lakehouse-paimon";
+ }
+
+ @Override
+ protected boolean supportsSparkSQLClusteredBy() {
+ return false;
+ }
+
+ @Override
+ protected boolean supportsPartition() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsDelete() {
+ return false;
+ }
+
+ @Override
+ protected boolean supportsSchemaEvolution() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsReplaceColumns() {
+ // Paimon doesn't support replace columns, because it doesn't support drop
all fields in table.
+ // And `ALTER TABLE REPLACE COLUMNS` statement will remove all existing
columns at first and
+ // then adds the new set of columns.
+ return false;
+ }
+
+ @Override
+ protected String getTableLocation(SparkTableInfo table) {
+ Map<String, String> tableProperties = table.getTableProperties();
+ return
tableProperties.get(PaimonPropertiesConstants.PAIMON_TABLE_LOCATION);
+ }
+
+ @Test
+ void testPaimonPartitions() {
+ String partitionPathString = "name=a/address=beijing";
+
+ String tableName = "test_paimon_partition_table";
+ dropTableIfExists(tableName);
+ String createTableSQL = getCreatePaimonSimpleTableString(tableName);
+ createTableSQL = createTableSQL + " PARTITIONED BY (name, address);";
+ sql(createTableSQL);
+ SparkTableInfo tableInfo = getTableInfo(tableName);
+ SparkTableInfoChecker checker =
+ SparkTableInfoChecker.create()
+ .withName(tableName)
+ .withColumns(getPaimonSimpleTableColumn())
+ .withIdentifyPartition(Collections.singletonList("name"))
+ .withIdentifyPartition(Collections.singletonList("address"));
+ checker.check(tableInfo);
+
+ String insertData = String.format("INSERT into %s
values(2,'a','beijing');", tableName);
+ sql(insertData);
+ List<String> queryResult = getTableData(tableName);
+ Assertions.assertEquals(1, queryResult.size());
+ Assertions.assertEquals("2,a,beijing", queryResult.get(0));
+ Path partitionPath = new Path(getTableLocation(tableInfo),
partitionPathString);
+ checkDirExists(partitionPath);
+ }
+
+ private String getCreatePaimonSimpleTableString(String tableName) {
+ return String.format(
+ "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '',
address STRING COMMENT '') USING paimon",
+ tableName);
+ }
+
+ private List<SparkTableInfo.SparkColumnInfo> getPaimonSimpleTableColumn() {
+ return Arrays.asList(
+ SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id
comment"),
+ SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""),
+ SparkTableInfo.SparkColumnInfo.of("address", DataTypes.StringType,
""));
+ }
+}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
index 38b21ddf0..077936c29 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.spark.connector.ConnectorConstants;
import org.apache.gravitino.spark.connector.hive.SparkHiveTable;
import org.apache.gravitino.spark.connector.iceberg.SparkIcebergTable;
+import org.apache.gravitino.spark.connector.paimon.SparkPaimonTable;
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -71,6 +72,10 @@ public class SparkTableInfo {
return tableProperties.get(TableCatalog.PROP_LOCATION);
}
+ public Map<String, String> getTableProperties() {
+ return tableProperties;
+ }
+
// Include database name and table name
public String getTableIdentifier() {
if (StringUtils.isNotBlank(database)) {
@@ -186,6 +191,8 @@ public class SparkTableInfo {
return ((SparkHiveTable) baseTable).schema();
} else if (baseTable instanceof SparkIcebergTable) {
return ((SparkIcebergTable) baseTable).schema();
+ } else if (baseTable instanceof SparkPaimonTable) {
+ return ((SparkPaimonTable) baseTable).schema();
} else {
throw new IllegalArgumentException(
"Doesn't support Spark table: " + baseTable.getClass().getName());
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 646f41484..ed7d2085f 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -74,10 +74,13 @@ public abstract class SparkUtilIT extends BaseIT {
// Specify Location explicitly because the default location is local HDFS,
Spark will expand the
// location to HDFS.
- protected void createDatabaseIfNotExists(String database) {
- sql(
- String.format(
- "CREATE DATABASE IF NOT EXISTS %s LOCATION '/user/hive/%s'",
database, database));
+ // However, Paimon does not support create a database with a specified
location.
+ protected void createDatabaseIfNotExists(String database, String provider) {
+ String locationClause =
+ "lakehouse-paimon".equalsIgnoreCase(provider)
+ ? ""
+ : String.format("LOCATION '/user/hive/%s'", database);
+ sql(String.format("CREATE DATABASE IF NOT EXISTS %s %s", database,
locationClause));
}
protected Map<String, String> getDatabaseMetadata(String database) {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
new file mode 100644
index 000000000..a3a0e9128
--- /dev/null
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/paimon/TestPaimonPropertiesConverter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPaimonPropertiesConverter {
+ private final PaimonPropertiesConverter paimonPropertiesConverter =
+ PaimonPropertiesConverter.getInstance();
+
+ @Test
+ void testCatalogPropertiesWithHiveBackend() {
+ Map<String, String> properties =
+ paimonPropertiesConverter.toSparkCatalogProperties(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_HIVE,
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI,
+ "hive-uri",
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+ "hive-warehouse",
+ "key1",
+ "value1"));
+ Assertions.assertEquals(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+ PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_HIVE,
+ PaimonPropertiesConstants.PAIMON_CATALOG_URI,
+ "hive-uri",
+ PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+ "hive-warehouse"),
+ properties);
+ }
+
+ @Test
+ void testCatalogPropertiesWithJdbcBackend() {
+ Map<String, String> properties =
+ paimonPropertiesConverter.toSparkCatalogProperties(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+ PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC,
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_URI,
+ "jdbc-uri",
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+ "jdbc-warehouse",
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_USER,
+ "user",
+
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_JDBC_PASSWORD,
+ "passwd",
+ "key1",
+ "value1"));
+ Assertions.assertEquals(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+ PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_JDBC,
+ PaimonPropertiesConstants.PAIMON_CATALOG_URI,
+ "jdbc-uri",
+ PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+ "jdbc-warehouse",
+ PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_USER,
+ "user",
+ PaimonPropertiesConstants.PAIMON_CATALOG_JDBC_PASSWORD,
+ "passwd"),
+ properties);
+ }
+
+ @Test
+ void testCatalogPropertiesWithFilesystemBackend() {
+ Map<String, String> properties =
+ paimonPropertiesConverter.toSparkCatalogProperties(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND,
+
PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND_FILESYSTEM,
+ PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_WAREHOUSE,
+ "filesystem-warehouse",
+ "key1",
+ "value1"));
+ Assertions.assertEquals(
+ ImmutableMap.of(
+ PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE,
+ PaimonPropertiesConstants.PAIMON_CATALOG_BACKEND_FILESYSTEM,
+ PaimonPropertiesConstants.PAIMON_CATALOG_WAREHOUSE,
+ "filesystem-warehouse"),
+ properties);
+ }
+}
diff --git a/spark-connector/v3.3/spark/build.gradle.kts
b/spark-connector/v3.3/spark/build.gradle.kts
index c4c417d62..66c65f863 100644
--- a/spark-connector/v3.3/spark/build.gradle.kts
+++ b/spark-connector/v3.3/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"]
as? String ?: extr
val sparkVersion: String = libs.versions.spark33.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
val kyuubiVersion: String = libs.versions.kyuubi4spark33.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
@@ -43,6 +44,9 @@ dependencies {
exclude("com.fasterxml.jackson")
}
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation(project(":api")) {
exclude("org.apache.logging.log4j")
@@ -122,6 +126,9 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -134,6 +141,9 @@ dependencies {
exclude("com.fasterxml.jackson.core")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}
@@ -152,6 +162,7 @@ tasks.test {
dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
dependsOn(":catalogs:catalog-hive:jar")
dependsOn(":iceberg:iceberg-rest-server:jar")
+ dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
}
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
similarity index 52%
copy from
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
index 1b0af02f8..2fef911a8 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark33.java
@@ -16,20 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestCatalogNameAdaptor {
- @Test
- void testSpark33() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(),
hiveCatalogName);
-
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(),
icebergCatalogName);
- }
-}
+public class GravitinoPaimonCatalogSpark33 extends GravitinoPaimonCatalog {}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
similarity index 58%
copy from
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
index 5295e82fb..839b959c7 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT33.java
@@ -16,20 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class TestCatalogNameAdaptor {
+public class SparkPaimonCatalogFilesystemBackendIT33 extends
SparkPaimonCatalogFilesystemBackendIT {
@Test
- void testSpark35() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(),
hiveCatalogName);
-
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
+ void testCatalogClassName() {
+ String catalogClass =
+ getSparkSession()
+ .sessionState()
+ .conf()
+ .getConfString("spark.sql.catalog." + getCatalogName());
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(),
catalogClass);
}
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index 1b0af02f8..37c95e478 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark33;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(),
icebergCatalogName);
+
+ String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark33.class.getName(),
paimonCatalogName);
}
}
diff --git a/spark-connector/v3.4/spark/build.gradle.kts
b/spark-connector/v3.4/spark/build.gradle.kts
index f3308fca3..aa4134a3c 100644
--- a/spark-connector/v3.4/spark/build.gradle.kts
+++ b/spark-connector/v3.4/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"]
as? String ?: extr
val sparkVersion: String = libs.versions.spark34.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
val kyuubiVersion: String = libs.versions.kyuubi4spark34.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
@@ -44,6 +45,9 @@ dependencies {
}
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation(project(":api")) {
exclude("org.apache.logging.log4j")
@@ -122,6 +126,9 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -134,6 +141,9 @@ dependencies {
exclude("com.fasterxml.jackson.core")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}
@@ -152,6 +162,7 @@ tasks.test {
dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
dependsOn(":catalogs:catalog-hive:jar")
dependsOn(":iceberg:iceberg-rest-server:jar")
+ dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
}
}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
similarity index 53%
copy from
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
index 5295e82fb..eb3e87793 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark34.java
@@ -16,20 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.gravitino.spark.connector.SparkTableChangeConverter;
+import org.apache.gravitino.spark.connector.SparkTableChangeConverter34;
+import org.apache.gravitino.spark.connector.SparkTypeConverter;
+import org.apache.gravitino.spark.connector.SparkTypeConverter34;
-public class TestCatalogNameAdaptor {
- @Test
- void testSpark35() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(),
hiveCatalogName);
+public class GravitinoPaimonCatalogSpark34 extends GravitinoPaimonCatalog {
+ @Override
+ protected SparkTypeConverter getSparkTypeConverter() {
+ return new SparkTypeConverter34();
+ }
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
+ @Override
+ protected SparkTableChangeConverter getSparkTableChangeConverter(
+ SparkTypeConverter sparkTypeConverter) {
+ return new SparkTableChangeConverter34(sparkTypeConverter);
}
}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
similarity index 59%
copy from
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
index 5295e82fb..d23070732 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT34.java
@@ -16,20 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class TestCatalogNameAdaptor {
- @Test
- void testSpark35() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(),
hiveCatalogName);
+public class SparkPaimonCatalogFilesystemBackendIT34 extends
SparkPaimonCatalogFilesystemBackendIT {
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
+ @Test
+ void testCatalogClassName() {
+ String catalogClass =
+ getSparkSession()
+ .sessionState()
+ .conf()
+ .getConfString("spark.sql.catalog." + getCatalogName());
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(),
catalogClass);
}
}
diff --git
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index a2e95c8ea..af9e67fab 100644
---
a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark34;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(),
icebergCatalogName);
+
+ String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark34.class.getName(),
paimonCatalogName);
}
}
diff --git a/spark-connector/v3.5/spark/build.gradle.kts
b/spark-connector/v3.5/spark/build.gradle.kts
index 7b8cc8447..15aa01808 100644
--- a/spark-connector/v3.5/spark/build.gradle.kts
+++ b/spark-connector/v3.5/spark/build.gradle.kts
@@ -31,6 +31,7 @@ val scalaVersion: String = project.properties["scalaVersion"]
as? String ?: extr
val sparkVersion: String = libs.versions.spark35.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
+val paimonVersion: String = libs.versions.paimon.get()
val kyuubiVersion: String = libs.versions.kyuubi4spark35.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String =
libs.versions.scala.collection.compat.get()
@@ -45,6 +46,9 @@ dependencies {
}
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
+
compileOnly("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation(project(":api")) {
exclude("org.apache.logging.log4j")
@@ -124,6 +128,9 @@ dependencies {
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
+
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
{
+ exclude("org.apache.spark")
+ }
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
@@ -136,6 +143,9 @@ dependencies {
exclude("com.fasterxml.jackson.core")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")
+
testImplementation("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}
@@ -154,6 +164,7 @@ tasks.test {
dependsOn(":catalogs:catalog-lakehouse-iceberg:jar")
dependsOn(":catalogs:catalog-hive:jar")
dependsOn(":iceberg:iceberg-rest-server:jar")
+ dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
}
}
diff --git
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
similarity index 52%
copy from
spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
index 1b0af02f8..2c39af5b2 100644
---
a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/paimon/GravitinoPaimonCatalogSpark35.java
@@ -16,20 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestCatalogNameAdaptor {
- @Test
- void testSpark33() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(),
hiveCatalogName);
-
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(),
icebergCatalogName);
- }
-}
+public class GravitinoPaimonCatalogSpark35 extends
GravitinoPaimonCatalogSpark34 {}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
similarity index 59%
copy from
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
copy to
spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
index 5295e82fb..44281c76e 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogFilesystemBackendIT35.java
@@ -16,20 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.spark.connector.version;
+package org.apache.gravitino.spark.connector.integration.test.paimon;
-import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
-import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class TestCatalogNameAdaptor {
- @Test
- void testSpark35() {
- String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive");
- Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(),
hiveCatalogName);
+public class SparkPaimonCatalogFilesystemBackendIT35 extends
SparkPaimonCatalogFilesystemBackendIT {
- String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
- Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
+ @Test
+ void testCatalogClassName() {
+ String catalogClass =
+ getSparkSession()
+ .sessionState()
+ .conf()
+ .getConfString("spark.sql.catalog." + getCatalogName());
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(),
catalogClass);
}
}
diff --git
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
index 5295e82fb..f02584cd6 100644
---
a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
+++
b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/version/TestCatalogNameAdaptor.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.version;
import org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35;
import
org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35;
+import
org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -31,5 +32,8 @@ public class TestCatalogNameAdaptor {
String icebergCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-iceberg");
Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(),
icebergCatalogName);
+
+ String paimonCatalogName =
CatalogNameAdaptor.getCatalogName("lakehouse-paimon");
+ Assertions.assertEquals(GravitinoPaimonCatalogSpark35.class.getName(),
paimonCatalogName);
}
}