This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new fce001768 [#4264] feat(iceberg): support access S3 with static access
key for Iceberg REST server and Gravitino Iceberg catalog (#4250)
fce001768 is described below
commit fce001768123c6c2d75f4fb5cb81bf02511ea08d
Author: FANNG <[email protected]>
AuthorDate: Thu Aug 1 19:46:36 2024 +0800
[#4264] feat(iceberg): support access S3 with static access key for
Iceberg REST server and Gravitino Iceberg catalog (#4250)
### What changes were proposed in this pull request?
1. support using static access-key-id and secret-access-key to access S3
data for iceberg rest server and gravitino iceberg catalog
2. refactor the code to reuse the iceberg catalog transform logic for
connector and Iceberg rest server and Iceberg catalog
3. add configuration to manage the s3 access key
### Why are the changes needed?
Fix: #4264
### Does this PR introduce _any_ user-facing change?
yes, add some document
### How was this patch tested?
I tested Iceberg REST server and Iceberg catalog with `jdbc` catalog
backend and `hive` catalog backend to access S3 data.
---
catalogs/catalog-common/build.gradle.kts | 1 +
.../lakehouse/iceberg/IcebergConstants.java | 11 ++
.../lakehouse/iceberg/IcebergPropertiesUtils.java | 69 +++++++++++++
.../iceberg/IcebergCatalogPropertiesMetadata.java | 59 ++++-------
.../integration/test/CatalogIcebergBaseIT.java | 7 +-
.../java/org/apache/gravitino/utils/MapUtils.java | 20 ++++
conf/gravitino-iceberg-rest-server.conf.template | 18 +++-
docs/iceberg-rest-service.md | 20 +++-
docs/lakehouse-iceberg-catalog.md | 18 ++++
gradle/libs.versions.toml | 4 +
iceberg/iceberg-common/build.gradle.kts | 3 +
.../gravitino/iceberg/common/IcebergConfig.java | 50 ++++++++-
.../iceberg/common/ops/IcebergTableOps.java | 54 ++++++++--
.../iceberg/common/utils/IcebergCatalogUtil.java | 55 +++++-----
.../common/utils/TestIcebergCatalogUtil.java | 5 +-
.../iceberg/IcebergPropertiesConverter.java | 115 ++-------------------
16 files changed, 321 insertions(+), 188 deletions(-)
diff --git a/catalogs/catalog-common/build.gradle.kts
b/catalogs/catalog-common/build.gradle.kts
index 5bfe52af5..ef3785f7c 100644
--- a/catalogs/catalog-common/build.gradle.kts
+++ b/catalogs/catalog-common/build.gradle.kts
@@ -24,4 +24,5 @@ plugins {
// try to avoid adding extra dependencies because it is used by catalogs and
connectors.
dependencies {
implementation(libs.slf4j.api)
+ implementation(libs.guava)
}
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 07417fd3c..4a1f46fd6 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -34,6 +34,17 @@ public class IcebergConstants {
public static final String URI = "uri";
public static final String CATALOG_BACKEND_NAME = "catalog-backend-name";
+ // IO properties
+ public static final String IO_IMPL = "io-impl";
+ public static final String GRAVITINO_S3_ENDPOINT = "s3-endpoint";
+ public static final String ICEBERG_S3_ENDPOINT = "s3.endpoint";
+ public static final String GRAVITINO_S3_ACCESS_KEY_ID = "s3-access-key-id";
+ public static final String ICEBERG_S3_ACCESS_KEY_ID = "s3.access-key-id";
+ public static final String GRAVITINO_S3_SECRET_ACCESS_KEY =
"s3-secret-access-key";
+ public static final String ICEBERG_S3_SECRET_ACCESS_KEY =
"s3.secret-access-key";
+ public static final String GRAVITINO_S3_REGION = "s3-region";
+ public static final String AWS_S3_REGION = "client.region";
+
// Iceberg Table properties constants
public static final String COMMENT = "comment";
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
new file mode 100644
index 000000000..3420daa97
--- /dev/null
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.iceberg;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergPropertiesUtils {
+
+ // Map that maintains the mapping of keys in Gravitino to that in Iceberg,
for example, users
+ // will only need to set the configuration 'catalog-backend' in Gravitino
and Gravitino will
+ // change it to `catalogType` automatically and pass it to Iceberg.
+ public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
+
+ static {
+ Map<String, String> map = new HashMap();
+ map.put(IcebergConstants.CATALOG_BACKEND,
IcebergConstants.CATALOG_BACKEND);
+ map.put(IcebergConstants.GRAVITINO_JDBC_DRIVER,
IcebergConstants.GRAVITINO_JDBC_DRIVER);
+ map.put(IcebergConstants.GRAVITINO_JDBC_USER,
IcebergConstants.ICEBERG_JDBC_USER);
+ map.put(IcebergConstants.GRAVITINO_JDBC_PASSWORD,
IcebergConstants.ICEBERG_JDBC_PASSWORD);
+ map.put(IcebergConstants.URI, IcebergConstants.URI);
+ map.put(IcebergConstants.WAREHOUSE, IcebergConstants.WAREHOUSE);
+ map.put(IcebergConstants.CATALOG_BACKEND_NAME,
IcebergConstants.CATALOG_BACKEND_NAME);
+ map.put(IcebergConstants.IO_IMPL, IcebergConstants.IO_IMPL);
+ map.put(IcebergConstants.GRAVITINO_S3_ENDPOINT,
IcebergConstants.ICEBERG_S3_ENDPOINT);
+ map.put(IcebergConstants.GRAVITINO_S3_REGION,
IcebergConstants.AWS_S3_REGION);
+ map.put(IcebergConstants.GRAVITINO_S3_ACCESS_KEY_ID,
IcebergConstants.ICEBERG_S3_ACCESS_KEY_ID);
+ map.put(
+ IcebergConstants.GRAVITINO_S3_SECRET_ACCESS_KEY,
+ IcebergConstants.ICEBERG_S3_SECRET_ACCESS_KEY);
+ GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);
+ }
+
+ /**
+ * Converts Gravitino properties to Iceberg catalog properties, the common
transform logic shared
+ * by Spark connector, Iceberg REST server, Gravitino Iceberg catalog.
+ *
+ * @param gravitinoProperties a map of Gravitino configuration properties.
+ * @return a map containing Iceberg catalog properties.
+ */
+ public static Map<String, String> toIcebergCatalogProperties(
+ Map<String, String> gravitinoProperties) {
+ Map<String, String> icebergProperties = new HashMap<>();
+ gravitinoProperties.forEach(
+ (key, value) -> {
+ if (GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
+ icebergProperties.put(GRAVITINO_CONFIG_TO_ICEBERG.get(key), value);
+ }
+ });
+ return icebergProperties;
+ }
+}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
index 8f887816d..71293937e 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog.lakehouse.iceberg;
import static
org.apache.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry;
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import static
org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;
import com.google.common.collect.ImmutableList;
@@ -35,41 +36,14 @@ import
org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosConfi
public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetadata {
public static final String CATALOG_BACKEND =
IcebergConstants.CATALOG_BACKEND;
-
public static final String GRAVITINO_JDBC_USER =
IcebergConstants.GRAVITINO_JDBC_USER;
- public static final String ICEBERG_JDBC_USER =
IcebergConstants.ICEBERG_JDBC_USER;
-
public static final String GRAVITINO_JDBC_PASSWORD =
IcebergConstants.GRAVITINO_JDBC_PASSWORD;
- public static final String ICEBERG_JDBC_PASSWORD =
IcebergConstants.ICEBERG_JDBC_PASSWORD;
- public static final String ICEBERG_JDBC_INITIALIZE =
IcebergConstants.ICEBERG_JDBC_INITIALIZE;
-
- public static final String GRAVITINO_JDBC_DRIVER =
IcebergConstants.GRAVITINO_JDBC_DRIVER;
public static final String WAREHOUSE = IcebergConstants.WAREHOUSE;
public static final String URI = IcebergConstants.URI;
public static final String CATALOG_BACKEND_NAME =
IcebergConstants.CATALOG_BACKEND_NAME;
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
- // Map that maintains the mapping of keys in Gravitino to that in Iceberg,
for example, users
- // will only need to set the configuration 'catalog-backend' in Gravitino
and Gravitino will
- // change it to `catalogType` automatically and pass it to Iceberg.
- public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG =
- ImmutableMap.of(
- CATALOG_BACKEND,
- CATALOG_BACKEND,
- GRAVITINO_JDBC_DRIVER,
- GRAVITINO_JDBC_DRIVER,
- GRAVITINO_JDBC_USER,
- ICEBERG_JDBC_USER,
- GRAVITINO_JDBC_PASSWORD,
- ICEBERG_JDBC_PASSWORD,
- URI,
- URI,
- WAREHOUSE,
- WAREHOUSE,
- CATALOG_BACKEND_NAME,
- CATALOG_BACKEND_NAME);
-
public static final Map<String, String>
KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND =
ImmutableMap.of(
KerberosConfig.PRINCIPAL_KEY,
@@ -98,7 +72,17 @@ public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetad
false),
stringRequiredPropertyEntry(URI, "Iceberg catalog uri config",
false, false),
stringRequiredPropertyEntry(
- WAREHOUSE, "Iceberg catalog warehouse config", false, false));
+ WAREHOUSE, "Iceberg catalog warehouse config", false, false),
+ stringOptionalPropertyEntry(
+ IcebergConstants.IO_IMPL, "FileIO implement for Iceberg",
true, null, false),
+ stringOptionalPropertyEntry(
+ IcebergConstants.GRAVITINO_S3_ACCESS_KEY_ID, "s3
access-key-id", true, null, true),
+ stringOptionalPropertyEntry(
+ IcebergConstants.GRAVITINO_S3_SECRET_ACCESS_KEY,
+ "s3 secret-access-key",
+ true,
+ null,
+ true));
HashMap<String, PropertyEntry<?>> result =
Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES);
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
@@ -111,18 +95,15 @@ public class IcebergCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetad
return PROPERTIES_METADATA;
}
- public Map<String, String> transformProperties(Map<String, String>
properties) {
- Map<String, String> gravitinoConfig = Maps.newHashMap();
- properties.forEach(
- (key, value) -> {
- if (GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
- gravitinoConfig.put(GRAVITINO_CONFIG_TO_ICEBERG.get(key), value);
- }
-
- if (KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.containsKey(key)) {
-
gravitinoConfig.put(KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.get(key), value);
+ public Map<String, String> transformProperties(Map<String, String>
gravitinoProperties) {
+ Map<String, String> icebergProperties =
+ IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
+ gravitinoProperties.forEach(
+ (k, v) -> {
+ if (KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.containsKey(k)) {
+
icebergProperties.put(KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.get(k), v);
}
});
- return gravitinoConfig;
+ return icebergProperties;
}
}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index 7fd503550..3c2f14e36 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -54,6 +55,7 @@ import org.apache.gravitino.dto.util.DTOConverters;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
import org.apache.gravitino.integration.test.container.ContainerSuite;
@@ -219,7 +221,10 @@ public abstract class CatalogIcebergBaseIT extends
AbstractIT {
icebergCatalogProperties.put(
IcebergConfig.CATALOG_BACKEND_NAME.getKey(),
icebergCatalogBackendName);
- icebergCatalog = IcebergCatalogUtil.loadCatalogBackend(TYPE,
icebergCatalogProperties);
+ icebergCatalog =
+ IcebergCatalogUtil.loadCatalogBackend(
+ IcebergCatalogBackend.valueOf(TYPE.toUpperCase(Locale.ROOT)),
+ new IcebergConfig(icebergCatalogProperties));
if (icebergCatalog instanceof SupportsNamespaces) {
icebergSupportsNamespaces =
(org.apache.iceberg.catalog.SupportsNamespaces) icebergCatalog;
}
diff --git a/common/src/main/java/org/apache/gravitino/utils/MapUtils.java
b/common/src/main/java/org/apache/gravitino/utils/MapUtils.java
index f2c0281dd..8377a79c3 100644
--- a/common/src/main/java/org/apache/gravitino/utils/MapUtils.java
+++ b/common/src/main/java/org/apache/gravitino/utils/MapUtils.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.utils;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
+import java.util.function.Predicate;
/** Utility class for working with maps. */
public class MapUtils {
@@ -47,6 +48,25 @@ public class MapUtils {
return Collections.unmodifiableMap(configs);
}
+ /**
+ * Returns a map with all keys that match the predicate.
+ *
+ * @param m The map to filter.
+ * @param predicate The predicate expression to filter the keys.
+ * @return A map with all keys that match the predicate.
+ */
+ public static Map<String, String> getFilteredMap(Map<String, String> m,
Predicate predicate) {
+ Map<String, String> configs = Maps.newHashMap();
+ m.forEach(
+ (k, v) -> {
+ if (predicate.test(k)) {
+ configs.put(k, v);
+ }
+ });
+
+ return Collections.unmodifiableMap(configs);
+ }
+
/**
* Returns an unmodifiable map.
*
diff --git a/conf/gravitino-iceberg-rest-server.conf.template
b/conf/gravitino-iceberg-rest-server.conf.template
index ed3f8916b..77d93a35a 100644
--- a/conf/gravitino-iceberg-rest-server.conf.template
+++ b/conf/gravitino-iceberg-rest-server.conf.template
@@ -44,4 +44,20 @@ gravitino.iceberg-rest.responseHeaderSize = 131072
# The Iceberg catalog backend, it's recommended to change to hive or jdbc
gravitino.iceberg-rest.catalog-backend = memory
# The warehouse directory of Iceberg catalog
-gravitino.iceberg-rest.warehouse = /tmp/
+gravitino.iceberg-rest.warehouse = /tmp
+
+# THE CONFIGURATION EXAMPLE FOR JDBC CATALOG BACKEND WITH S3 SUPPORT
+
+# gravitino.iceberg-rest.catalog-backend = jdbc
+# gravitino.iceberg-rest.jdbc-driver = org.postgresql.Driver
+# gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432/postgres
+# gravitino.iceberg-rest.jdbc-user = postgres
+# gravitino.iceberg-rest.jdbc-password = abc123
+# gravitino.iceberg-rest.jdbc-initialize = true
+# change to s3a://test/my/key/prefix for Hive catalog backend
+# gravitino.iceberg-rest.warehouse = s3://test/my/key/prefix
+# gravitino.iceberg-rest.io-impl= org.apache.iceberg.aws.s3.S3FileIO
+# gravitino.iceberg-rest.s3-access-key-id = xxx
+# gravitino.iceberg-rest.s3-secret-access-key = xxx
+# gravitino.iceberg-rest.s3-endpoint = http://192.168.215.4:9010
+# gravitino.iceberg-rest.s3-region = xxx
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 981ca8fa2..0e7f93668 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -119,6 +119,24 @@ The `clients` property for example:
`catalog-impl` has no effect.
:::
+### S3 configuration
+
+Gravitino Iceberg REST service supports using static access-key-id and
secret-access-key to access S3 data.
+
+| Configuration item | Description
| Default value | Required | Since Version |
+|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.io-impl` | The IO implementation for
`FileIO` in Iceberg, use `org.apache.iceberg.aws.s3.S3FileIO` for S3.
| (none) | No | 0.6.0 |
+| `gravitino.iceberg-rest.s3-access-key-id` | The static access key ID
used to access S3 data.
| (none) | No | 0.6.0 |
+| `gravitino.iceberg-rest.s3-secret-access-key` | The static secret access key
used to access S3 data.
| (none) | No | 0.6.0 |
+| `gravitino.iceberg-rest.s3-endpoint` | An alternative endpoint of
the S3 service, This could be used for S3FileIO with any s3-compatible object
storage service that has a different endpoint, or access a private S3 endpoint
in a virtual private cloud. | (none) | No | 0.6.0 |
+| `gravitino.iceberg-rest.s3-region` | The region of the S3
service, like `us-west-2`.
| (none) | No | 0.6.0 |
+
+For other Iceberg s3 properties not managed by Gravitino like `s3.sse.type`,
you could config it directly by `gravitino.iceberg-rest.s3.sse.type`.
+
+:::info
+Please set `gravitino.iceberg-rest.warehouse` to
`s3://{bucket_name}/${prefix_name}` for Jdbc catalog backend,
`s3a://{bucket_name}/${prefix_name}` for Hive catalog backend.
+:::
+
### HDFS configuration
The Gravitino Iceberg REST catalog service adds the HDFS configuration files
`core-site.xml` and `hdfs-site.xml` from the directory defined by
`gravitino.auxService.iceberg-rest.classpath`, for example,
`catalogs/lakehouse-iceberg/conf`, to the classpath.
@@ -163,7 +181,7 @@ For example, we can configure Spark catalog options to use
Gravitino Iceberg RES
--conf spark.sql.catalog.rest.uri=http://127.0.0.1:9001/iceberg/
```
-You may need to adjust the Iceberg Spark runtime jar file name according to
the real version number in your environment.
+You may need to adjust the Iceberg Spark runtime jar file name according to
the real version number in your environment. If you want to access the data
stored in S3, you need to download
[iceberg-aws-bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
jar and place it in the classpath of Spark, no extra config is needed because
S3 related properties is transferred from Iceberg REST server to Iceberg REST
client automaticly.
### Exploring Apache Iceberg with Apache Spark SQL
diff --git a/docs/lakehouse-iceberg-catalog.md
b/docs/lakehouse-iceberg-catalog.md
index 61e7c4fcf..7951fd57b 100644
--- a/docs/lakehouse-iceberg-catalog.md
+++ b/docs/lakehouse-iceberg-catalog.md
@@ -71,6 +71,24 @@ If you have a JDBC Iceberg catalog prior, you must set
`catalog-backend-name` to
You must download the corresponding JDBC driver to the
`catalogs/lakehouse-iceberg/libs` directory.
:::
+#### S3
+
+Supports using static access-key-id and secret-access-key to access S3 data.
+
+| Configuration item | Description
| Default value | Required | Since Version |
+|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `io-impl` | The io implementation for `FileIO` in Iceberg, use
`org.apache.iceberg.aws.s3.S3FileIO` for s3.
| (none) | No | 0.6.0 |
+| `s3-access-key-id` | The static access key ID used to access S3 data.
| (none) | No | 0.6.0 |
+| `s3-secret-access-key` | The static secret access key used to access S3
data.
| (none) | No | 0.6.0 |
+| `s3-endpoint` | An alternative endpoint of the S3 service, This
could be used for S3FileIO with any s3-compatible object storage service that
has a different endpoint, or access a private S3 endpoint in a virtual private
cloud. | (none) | No | 0.6.0 |
+| `s3-region` | The region of the S3 service, like `us-west-2`.
| (none) | No | 0.6.0 |
+
+For other Iceberg s3 properties not managed by Gravitino like `s3.sse.type`,
you could config it directly by `gravitino.bypass.s3.sse.type`.
+
+:::info
+Please set `gravitino.iceberg-rest.warehouse` to
`s3://{bucket_name}/${prefix_name}` for JDBC catalog backend,
`s3a://{bucket_name}/${prefix_name}` for Hive catalog backend.
+:::
+
### Catalog operations
Please refer to [Manage Relational Metadata Using
Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations)
for more details.
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 00626f154..ec5064009 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,6 +17,7 @@
# under the License.
#
[versions]
+aws = "2.26.20"
junit = "5.8.1"
protoc = "3.24.4"
jackson = "2.15.2"
@@ -92,6 +93,8 @@ sun-activation-version = "1.2.0"
error-prone = "3.1.0"
[libraries]
+aws-s3 = { group = "software.amazon.awssdk", name = "s3", version.ref = "aws" }
+aws-sts = { group = "software.amazon.awssdk", name = "sts", version.ref =
"aws" }
protobuf-java = { group = "com.google.protobuf", name = "protobuf-java",
version.ref = "protoc" }
protobuf-java-util = { group = "com.google.protobuf", name =
"protobuf-java-util", version.ref = "protoc" }
jackson-databind = { group = "com.fasterxml.jackson.core", name =
"jackson-databind", version.ref = "jackson" }
@@ -149,6 +152,7 @@ commons-io = { group = "commons-io", name = "commons-io",
version.ref = "commons
caffeine = { group = "com.github.ben-manes.caffeine", name = "caffeine",
version.ref = "caffeine" }
rocksdbjni = { group = "org.rocksdb", name = "rocksdbjni", version.ref =
"rocksdbjni" }
commons-collections4 = { group = "org.apache.commons", name =
"commons-collections4", version.ref = "commons-collections4" }
+iceberg-aws = { group = "org.apache.iceberg", name = "iceberg-aws",
version.ref = "iceberg" }
iceberg-core = { group = "org.apache.iceberg", name = "iceberg-core",
version.ref = "iceberg" }
iceberg-api = { group = "org.apache.iceberg", name = "iceberg-api",
version.ref = "iceberg" }
iceberg-hive-metastore = { group = "org.apache.iceberg", name =
"iceberg-hive-metastore", version.ref = "iceberg" }
diff --git a/iceberg/iceberg-common/build.gradle.kts
b/iceberg/iceberg-common/build.gradle.kts
index c27f848d6..be7542b5d 100644
--- a/iceberg/iceberg-common/build.gradle.kts
+++ b/iceberg/iceberg-common/build.gradle.kts
@@ -31,9 +31,12 @@ dependencies {
implementation(project(":server-common"))
implementation(libs.bundles.iceberg)
implementation(libs.bundles.log4j)
+ implementation(libs.aws.s3)
+ implementation(libs.aws.sts)
implementation(libs.caffeine)
implementation(libs.commons.lang3)
implementation(libs.guava)
+ implementation(libs.iceberg.aws)
implementation(libs.iceberg.hive.metastore)
implementation(libs.hadoop2.common) {
exclude("com.github.spotbugs")
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index bad696754..8bf9b86d0 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
@@ -32,6 +33,7 @@ import org.apache.gravitino.server.web.JettyServerConfig;
import org.apache.gravitino.server.web.OverwriteDefaultConfig;
public class IcebergConfig extends Config implements OverwriteDefaultConfig {
+
public static final String ICEBERG_CONFIG_PREFIX = "gravitino.iceberg-rest.";
public static final ConfigEntry<String> CATALOG_BACKEND =
@@ -72,7 +74,6 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
.stringConf()
.checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();
-
public static final ConfigEntry<String> JDBC_DRIVER =
new ConfigBuilder(IcebergConstants.GRAVITINO_JDBC_DRIVER)
.doc("The driver of the Jdbc connection")
@@ -88,6 +89,45 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
.booleanConf()
.createWithDefault(true);
+ public static final ConfigEntry<String> IO_IMPL =
+ new ConfigBuilder(IcebergConstants.IO_IMPL)
+ .doc("The io implementation for `FileIO` in Iceberg")
+ .version(ConfigConstants.VERSION_0_6_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> S3_ENDPOINT =
+ new ConfigBuilder(IcebergConstants.GRAVITINO_S3_ENDPOINT)
+ .doc(
+ "An alternative endpoint of the S3 service, This could be used
to for S3FileIO with "
+ + "any s3-compatible object storage service that has a
different endpoint, or "
+ + "access a private S3 endpoint in a virtual private cloud")
+ .version(ConfigConstants.VERSION_0_6_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> S3_REGION =
+ new ConfigBuilder(IcebergConstants.GRAVITINO_S3_REGION)
+ .doc("The region of the S3 service")
+ .version(ConfigConstants.VERSION_0_6_0)
+ .stringConf()
+ .checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
+ .create();
+
+ public static final ConfigEntry<String> S3_ACCESS_KEY_ID =
+ new ConfigBuilder(IcebergConstants.GRAVITINO_S3_ACCESS_KEY_ID)
+ .doc("The static access key ID used to access S3 data")
+ .version(ConfigConstants.VERSION_0_6_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<String> S3_SECRET_ACCESS_KEY =
+ new ConfigBuilder(IcebergConstants.GRAVITINO_S3_SECRET_ACCESS_KEY)
+ .doc("The static secret access key used to access S3 data")
+ .version(ConfigConstants.VERSION_0_6_0)
+ .stringConf()
+ .create();
+
public static final ConfigEntry<String> ICEBERG_METRICS_STORE =
new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE)
.doc("The store to save Iceberg metrics")
@@ -135,6 +175,14 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
super(false);
}
+ public Map<String, String> getIcebergCatalogProperties() {
+ Map<String, String> config = getAllConfig();
+ Map<String, String> transformedConfig =
+ IcebergPropertiesUtils.toIcebergCatalogProperties(config);
+ transformedConfig.putAll(config);
+ return transformedConfig;
+ }
+
@Override
public Map<String, String> getOverwriteDefaultConfig() {
return ImmutableMap.of(
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java
index 1f53b640e..56ca6e505 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOps.java
@@ -19,16 +19,23 @@
package org.apache.gravitino.iceberg.common.ops;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
import org.apache.gravitino.utils.IsolatedClassLoader;
+import org.apache.gravitino.utils.MapUtils;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -51,23 +58,39 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergTableOps implements AutoCloseable {
+
public static final Logger LOG =
LoggerFactory.getLogger(IcebergTableOps.class);
@Getter protected Catalog catalog;
private SupportsNamespaces asNamespaceCatalog;
- private final String catalogType;
+ private final IcebergCatalogBackend catalogBackend;
private String catalogUri = null;
+ private Map<String, String> catalogConfigToClients;
+ private static final Set<String> catalogPropertiesToClientKeys =
+ ImmutableSet.of(
+ IcebergConstants.IO_IMPL,
+ IcebergConstants.AWS_S3_REGION,
+ IcebergConstants.ICEBERG_S3_ACCESS_KEY_ID,
+ IcebergConstants.ICEBERG_S3_SECRET_ACCESS_KEY,
+ IcebergConstants.ICEBERG_S3_ENDPOINT);
public IcebergTableOps(IcebergConfig icebergConfig) {
- this.catalogType = icebergConfig.get(IcebergConfig.CATALOG_BACKEND);
- if (!IcebergCatalogBackend.MEMORY.name().equalsIgnoreCase(catalogType)) {
+ this.catalogBackend =
+ IcebergCatalogBackend.valueOf(
+
icebergConfig.get(IcebergConfig.CATALOG_BACKEND).toUpperCase(Locale.ROOT));
+ if (!IcebergCatalogBackend.MEMORY.equals(catalogBackend)) {
+ // check whether IcebergConfig.CATALOG_WAREHOUSE exists
icebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE);
this.catalogUri = icebergConfig.get(IcebergConfig.CATALOG_URI);
}
- catalog = IcebergCatalogUtil.loadCatalogBackend(catalogType,
icebergConfig.getAllConfig());
+ this.catalog = IcebergCatalogUtil.loadCatalogBackend(catalogBackend,
icebergConfig);
if (catalog instanceof SupportsNamespaces) {
- asNamespaceCatalog = (SupportsNamespaces) catalog;
+ this.asNamespaceCatalog = (SupportsNamespaces) catalog;
}
+ this.catalogConfigToClients =
+ MapUtils.getFilteredMap(
+ icebergConfig.getIcebergCatalogProperties(),
+ key -> catalogPropertiesToClientKeys.contains(key));
}
public IcebergTableOps() {
@@ -119,9 +142,9 @@ public class IcebergTableOps implements AutoCloseable {
public LoadTableResponse createTable(Namespace namespace, CreateTableRequest
request) {
request.validate();
if (request.stageCreate()) {
- return CatalogHandlers.stageTableCreate(catalog, namespace, request);
+ return injectTableConfig(() -> CatalogHandlers.stageTableCreate(catalog,
namespace, request));
}
- return CatalogHandlers.createTable(catalog, namespace, request);
+ return injectTableConfig(() -> CatalogHandlers.createTable(catalog,
namespace, request));
}
public void dropTable(TableIdentifier tableIdentifier) {
@@ -133,7 +156,7 @@ public class IcebergTableOps implements AutoCloseable {
}
public LoadTableResponse loadTable(TableIdentifier tableIdentifier) {
- return CatalogHandlers.loadTable(catalog, tableIdentifier);
+ return injectTableConfig(() -> CatalogHandlers.loadTable(catalog,
tableIdentifier));
}
public boolean tableExists(TableIdentifier tableIdentifier) {
@@ -177,7 +200,7 @@ public class IcebergTableOps implements AutoCloseable {
closeMySQLCatalogResource();
} else if (catalogUri != null && catalogUri.contains("postgresql")) {
closePostgreSQLCatalogResource();
- } else if
(catalogType.equalsIgnoreCase(IcebergCatalogBackend.HIVE.name())) {
+ } else if (catalogBackend.equals(IcebergCatalogBackend.HIVE)) {
// TODO(yuqi) add close for other catalog types such Hive catalog, for
more, please refer to
//
https://github.com/apache/gravitino/pull/2548/commits/ab876b69b7e094bbd8c174d48a2365a18ed5176d
}
@@ -217,6 +240,19 @@ public class IcebergTableOps implements AutoCloseable {
closeDriverLoadedByIsolatedClassLoader(catalogUri);
}
+ // Some io and security configuration should pass to Iceberg REST client
+ private LoadTableResponse injectTableConfig(Supplier<LoadTableResponse>
supplier) {
+ LoadTableResponse loadTableResponse = supplier.get();
+ return LoadTableResponse.builder()
+ .withTableMetadata(loadTableResponse.tableMetadata())
+ .addAllConfig(getCatalogConfigToClient())
+ .build();
+ }
+
+ private Map<String, String> getCatalogConfigToClient() {
+ return catalogConfigToClients;
+ }
+
@Getter
@Setter
public static final class IcebergTableChange {
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
index 161526df7..8f171bea6 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
@@ -21,11 +21,13 @@ package org.apache.gravitino.iceberg.common.utils;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.ClosableHiveCatalog;
@@ -49,23 +51,22 @@ public class IcebergCatalogUtil {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogUtil.class);
- private static InMemoryCatalog loadMemoryCatalog(Map<String, String>
properties) {
- IcebergConfig icebergConfig = new IcebergConfig(properties);
+ private static InMemoryCatalog loadMemoryCatalog(IcebergConfig
icebergConfig) {
String icebergCatalogName = icebergConfig.getCatalogBackendName("memory");
InMemoryCatalog memoryCatalog = new InMemoryCatalog();
- Map<String, String> resultProperties = new HashMap<>(properties);
+ Map<String, String> resultProperties =
icebergConfig.getIcebergCatalogProperties();
resultProperties.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp");
memoryCatalog.initialize(icebergCatalogName, resultProperties);
return memoryCatalog;
}
- private static HiveCatalog loadHiveCatalog(Map<String, String> properties) {
+ private static HiveCatalog loadHiveCatalog(IcebergConfig icebergConfig) {
ClosableHiveCatalog hiveCatalog = new ClosableHiveCatalog();
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
- properties.forEach(hdfsConfiguration::set);
- IcebergConfig icebergConfig = new IcebergConfig(properties);
String icebergCatalogName = icebergConfig.getCatalogBackendName("hive");
+ Map<String, String> properties =
icebergConfig.getIcebergCatalogProperties();
+ properties.forEach(hdfsConfiguration::set);
AuthenticationConfig authenticationConfig = new
AuthenticationConfig(properties);
if (authenticationConfig.isSimpleAuth()) {
hiveCatalog.setConf(hdfsConfiguration);
@@ -107,18 +108,17 @@ public class IcebergCatalogUtil {
}
}
- private static JdbcCatalog loadJdbcCatalog(Map<String, String> properties) {
- IcebergConfig icebergConfig = new IcebergConfig(properties);
+ private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) {
String driverClassName = icebergConfig.getJdbcDriver();
String icebergCatalogName = icebergConfig.getCatalogBackendName("jdbc");
+ Map<String, String> properties =
icebergConfig.getIcebergCatalogProperties();
Preconditions.checkNotNull(
properties.get(IcebergConstants.ICEBERG_JDBC_USER),
IcebergConstants.ICEBERG_JDBC_USER + " is null");
Preconditions.checkNotNull(
properties.get(IcebergConstants.ICEBERG_JDBC_PASSWORD),
IcebergConstants.ICEBERG_JDBC_PASSWORD + " is null");
-
try {
// Load the jdbc driver
Class.forName(driverClassName);
@@ -126,11 +126,8 @@ public class IcebergCatalogUtil {
throw new IllegalArgumentException("Couldn't load jdbc driver " +
driverClassName);
}
JdbcCatalog jdbcCatalog =
- new JdbcCatalog(
- null,
- null,
- Boolean.parseBoolean(
-
properties.getOrDefault(IcebergConstants.ICEBERG_JDBC_INITIALIZE, "true")));
+ new JdbcCatalog(null, null,
icebergConfig.get(IcebergConfig.JDBC_INIT_TABLES));
+
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
properties.forEach(hdfsConfiguration::set);
jdbcCatalog.setConf(hdfsConfiguration);
@@ -138,37 +135,41 @@ public class IcebergCatalogUtil {
return jdbcCatalog;
}
- private static Catalog loadRestCatalog(Map<String, String> properties) {
- IcebergConfig icebergConfig = new IcebergConfig(properties);
+ private static Catalog loadRestCatalog(IcebergConfig icebergConfig) {
String icebergCatalogName = icebergConfig.getCatalogBackendName("rest");
RESTCatalog restCatalog = new RESTCatalog();
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
+ Map<String, String> properties =
icebergConfig.getIcebergCatalogProperties();
properties.forEach(hdfsConfiguration::set);
restCatalog.setConf(hdfsConfiguration);
restCatalog.initialize(icebergCatalogName, properties);
return restCatalog;
}
- public static Catalog loadCatalogBackend(String catalogType) {
- return loadCatalogBackend(catalogType, Collections.emptyMap());
+ @VisibleForTesting
+ static Catalog loadCatalogBackend(String catalogType) {
+ return loadCatalogBackend(
+ IcebergCatalogBackend.valueOf(catalogType.toUpperCase(Locale.ROOT)),
+ new IcebergConfig(Collections.emptyMap()));
}
- public static Catalog loadCatalogBackend(String catalogType, Map<String,
String> properties) {
- LOG.info("Load catalog backend of {}", catalogType);
- switch (IcebergCatalogBackend.valueOf(catalogType.toUpperCase())) {
+ public static Catalog loadCatalogBackend(
+ IcebergCatalogBackend catalogBackend, IcebergConfig icebergConfig) {
+ LOG.info("Load catalog backend of {}", catalogBackend);
+ switch (catalogBackend) {
case MEMORY:
- return loadMemoryCatalog(properties);
+ return loadMemoryCatalog(icebergConfig);
case HIVE:
- return loadHiveCatalog(properties);
+ return loadHiveCatalog(icebergConfig);
case JDBC:
- return loadJdbcCatalog(properties);
+ return loadJdbcCatalog(icebergConfig);
case REST:
- return loadRestCatalog(properties);
+ return loadRestCatalog(icebergConfig);
default:
throw new RuntimeException(
- catalogType
+ catalogBackend
+ " catalog is not supported yet, supported catalogs: [memory]"
- + catalogType);
+ + catalogBackend);
}
}
diff --git
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java
index 5cd34aeb1..289858ae3 100644
---
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java
+++
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java
@@ -22,6 +22,7 @@ package org.apache.gravitino.iceberg.common.utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
@@ -66,7 +67,9 @@ public class TestIcebergCatalogUtil {
properties.put(IcebergConstants.ICEBERG_JDBC_USER, "test");
properties.put(IcebergConstants.ICEBERG_JDBC_PASSWORD, "test");
properties.put(IcebergConstants.ICEBERG_JDBC_INITIALIZE, "false");
- catalog = IcebergCatalogUtil.loadCatalogBackend("jdbc", properties);
+ catalog =
+ IcebergCatalogUtil.loadCatalogBackend(
+ IcebergCatalogBackend.JDBC, new IcebergConfig(properties));
Assertions.assertTrue(catalog instanceof JdbcCatalog);
Assertions.assertThrowsExactly(
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java
index a79f95297..45125f6df 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java
@@ -21,9 +21,10 @@ package org.apache.gravitino.spark.connector.iceberg;
import com.google.common.base.Preconditions;
import java.util.HashMap;
-import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.spark.connector.PropertiesConverter;
/** Transform Apache Iceberg catalog properties between Apache Spark and
Apache Gravitino. */
@@ -43,32 +44,13 @@ public class IcebergPropertiesConverter implements
PropertiesConverter {
public Map<String, String> toSparkCatalogProperties(Map<String, String>
properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");
-
- String catalogBackend =
-
properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND);
+ Map<String, String> all =
IcebergPropertiesUtils.toIcebergCatalogProperties(properties);
+ String catalogBackend = all.remove(IcebergConstants.CATALOG_BACKEND);
Preconditions.checkArgument(
- StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend
should not be empty.");
-
- HashMap<String, String> all = new HashMap<>();
-
- switch (catalogBackend.toLowerCase(Locale.ROOT)) {
- case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE:
- initHiveProperties(properties, all);
- break;
- case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC:
- initJdbcProperties(properties, all);
- break;
- case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_REST:
- initRestProperties(properties, all);
- break;
- default:
- // SparkCatalog does not support Memory type catalog
- throw new IllegalArgumentException(
- "Unsupported Iceberg Catalog backend: " + catalogBackend);
- }
-
+ StringUtils.isNotBlank(catalogBackend),
+ String.format("%s should not be empty",
IcebergConstants.CATALOG_BACKEND));
+ all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, catalogBackend);
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED, "FALSE");
-
return all;
}
@@ -81,87 +63,4 @@ public class IcebergPropertiesConverter implements
PropertiesConverter {
public Map<String, String> toSparkTableProperties(Map<String, String>
properties) {
return new HashMap<>(properties);
}
-
- private void initHiveProperties(
- Map<String, String> gravitinoProperties, HashMap<String, String>
icebergProperties) {
- String metastoreUri =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(metastoreUri),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
- + " from Iceberg Catalog properties");
- String hiveWarehouse =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(hiveWarehouse),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
- + " from Iceberg Catalog properties");
- icebergProperties.put(
- IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
- IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
- icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
metastoreUri);
-
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
hiveWarehouse);
- }
-
- private void initJdbcProperties(
- Map<String, String> gravitinoProperties, HashMap<String, String>
icebergProperties) {
- String jdbcUri =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(jdbcUri),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
- + " from Iceberg Catalog properties");
- String jdbcWarehouse =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(jdbcWarehouse),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
- + " from Iceberg Catalog properties");
- String jdbcUser =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_USER);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(jdbcUser),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_USER
- + " from Iceberg Catalog properties");
- String jdbcPassword =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(jdbcPassword),
- "Couldn't get "
- +
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_JDBC_PASSWORD
- + " from Iceberg Catalog properties");
- icebergProperties.put(
- IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
- IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_JDBC);
- icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
jdbcUri);
-
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
jdbcWarehouse);
-
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_USER,
jdbcUser);
-
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_PASSWORD,
jdbcPassword);
- }
-
- private void initRestProperties(
- Map<String, String> gravitinoProperties, HashMap<String, String>
icebergProperties) {
- String restUri =
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(restUri),
- "Couldn't get "
- + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
- + " from Iceberg Catalog properties");
- icebergProperties.put(
- IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
- IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
-
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
restUri);
- if (gravitinoProperties.containsKey(
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE)) {
- icebergProperties.put(
- IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
-
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE));
- }
- }
}