This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 56fc27a10 [4176]feat(iceberg) support multiple catalogs in the
iceberg rest catalog server (#4273)
56fc27a10 is described below
commit 56fc27a10a8b2e0460f37e1ebf218eab77bdda75
Author: theoryxu <[email protected]>
AuthorDate: Tue Aug 13 18:54:46 2024 +0800
[4176]feat(iceberg) support multiple catalogs in the iceberg rest catalog
server (#4273)
### What changes were proposed in this pull request?
design doc:
https://docs.google.com/document/d/1_XwsVHAzUjPCyOnJml69VkzhKI9veFvMo5OzGcKCEAk/edit
use config based catalog provider to manage multi catalogs
### Why are the changes needed?
Fix: #4176
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
1. Manually test
4. Add UT
---------
Co-authored-by: theoryxu <[email protected]>
---
.../lakehouse/iceberg/IcebergConstants.java | 4 +
.../apache/gravitino/config/ConfigConstants.java | 3 +
docs/iceberg-rest-service.md | 45 ++++++++
.../gravitino/iceberg/common/IcebergConfig.java | 8 ++
.../ops/ConfigBasedIcebergTableOpsProvider.java | 90 ++++++++++++++++
.../iceberg/common/ops/IcebergTableOpsManager.java | 99 ++++++++++++++++++
.../common/ops/IcebergTableOpsProvider.java | 40 +++++++
.../TestConfigBasedIcebergTableOpsProvider.java | 115 +++++++++++++++++++++
.../common/ops/TestIcebergTableOpsManager.java | 61 +++++++++++
.../org/apache/gravitino/iceberg/RESTService.java | 12 +--
.../service/rest/IcebergNamespaceOperations.java | 52 ++++++----
.../service/rest/IcebergTableOperations.java | 41 +++++---
.../service/rest/IcebergTableRenameOperations.java | 14 +--
.../test/util/IcebergRESTServerManager.java | 1 -
.../ConfigBasedIcebergTableOpsProviderForTest.java | 29 ++++++
.../iceberg/service/rest/IcebergRestTestUtil.java | 15 ++-
.../iceberg/service/rest/IcebergTestBase.java | 11 +-
.../iceberg/service/rest/TestIcebergConfig.java | 8 +-
.../rest/TestIcebergNamespaceOperations.java | 6 +-
.../service/rest/TestIcebergTableOperations.java | 12 +--
20 files changed, 599 insertions(+), 67 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 4a1f46fd6..6e4aae37a 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -63,4 +63,8 @@ public class IcebergConstants {
public static final String ICEBERG_METRICS_QUEUE_CAPACITY =
"metricsQueueCapacity";
public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME =
"iceberg-rest";
+
+ public static final String ICEBERG_REST_CATALOG_PROVIDER =
"catalog-provider";
+
+ public static final String GRAVITINO_DEFAULT_CATALOG =
"__gravitino_default_catalog";
}
diff --git
a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index fd6efc70f..5317b19a4 100644
--- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -59,4 +59,7 @@ public final class ConfigConstants {
/** The version number for the 0.6.0 release. */
public static final String VERSION_0_6_0 = "0.6.0";
+
+ /** The version number for the 0.7.0 release. */
+ public static final String VERSION_0_7_0 = "0.7.0";
}
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 6ae17f74b..c37ea39ac 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -147,6 +147,51 @@ If you have a JDBC Iceberg catalog prior, you must set
`catalog-backend-name` to
You must download the corresponding JDBC driver to the
`iceberg-rest-server/libs` directory.
:::
+#### Multi catalog support
+
+The Gravitino Iceberg REST server supports multiple catalogs and offers a
configuration-based catalog management system.
+
+| Configuration item | Description
|
Default value | Required | Since Version |
+|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|----------|---------------|
+| `gravitino.iceberg-rest.catalog-provider` | Catalog provider class name,
you can develop a class that implements `IcebergTableOpsProvider` and add the
corresponding jar file to the Iceberg REST service classpath directory. |
`config-based-provider` | No | 0.7.0 |
+
+When using a config-based catalog provider, you can configure the default
catalog with `gravitino.iceberg-rest.catalog.<param name>=<value>`. For
specific catalogs, use the format `gravitino.iceberg-rest.catalog.<catalog
name>.<param name>=<value>`.
+
+For instance, you can configure three different catalogs, the default catalog
and the specific `hive_backend` and `jdbc_backend` catalogs separately.
+
+```text
+gravitino.iceberg-rest.catalog-backend = jdbc
+gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432
+gravitino.iceberg-rest.warehouse =
hdfs://127.0.0.1:9000/user/hive/warehouse-postgresql
+...
+gravitino.iceberg-rest.catalog.hive_backend.catalog-backend = hive
+gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9084
+gravitino.iceberg-rest.catalog.hive_backend.warehouse =
/user/hive/warehouse-hive/
+...
+gravitino.iceberg-rest.catalog.jdbc_backend.catalog-backend = jdbc
+gravitino.iceberg-rest.catalog.jdbc_backend.uri = jdbc:mysql://127.0.0.1:3306/
+gravitino.iceberg-rest.catalog.jdbc_backend.warehouse =
hdfs://127.0.0.1:9000/user/hive/warehouse-mysql
+...
+```
+
+You can access different catalogs by setting the `prefix` to the specific
catalog name in the Iceberg REST client configuration. The default catalog will
be used if you do not specify a `prefix`. For instance, consider the case of
SparkSQL.
+
+```shell
+./bin/spark-sql -v \
+...
+--conf spark.sql.catalog.default_rest_catalog.type=rest \
+--conf
spark.sql.catalog.default_rest_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+...
+--conf spark.sql.catalog.hive_backend_catalog.type=rest \
+--conf
spark.sql.catalog.hive_backend_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+--conf spark.sql.catalog.hive_backend_catalog.prefix=hive_backend \
+...
+--conf spark.sql.catalog.jdbc_backend_catalog.type=rest \
+--conf
spark.sql.catalog.jdbc_backend_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+--conf spark.sql.catalog.jdbc_backend_catalog.prefix=jdbc_backend \
+...
+```
+
### Other Apache Iceberg catalog properties
You can add other properties defined in [Iceberg catalog
properties](https://iceberg.apache.org/docs/1.5.2/configuration/#catalog-properties).
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index 8bf9b86d0..b75fc88d6 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -158,6 +158,14 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
.stringConf()
.create();
+ public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
+ new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
+ .doc(
+ "Catalog provider class name, you can develop a class that
implements `IcebergTableOpsProvider` and add the corresponding jar file to the
Iceberg REST service classpath directory.")
+ .version(ConfigConstants.VERSION_0_7_0)
+ .stringConf()
+ .createWithDefault("config-based-provider");
+
public String getJdbcDriver() {
return get(JDBC_DRIVER);
}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
new file mode 100644
index 000000000..070e67ce1
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This provider use configs to support multiple catalogs.
+ *
+ * <p>For example, there are two different catalogs: jdbc_proxy, hive_proxy
The config is like:
+ *
+ * <p>gravitino.iceberg-rest.catalog.jdbc_proxy.catalog-backend = jdbc
+ * gravitino.iceberg-rest.catalog.jdbc_proxy.uri =
jdbc:mysql://{host}:{port}/{db} ...
+ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive
+ * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ...
+ */
+public class ConfigBasedIcebergTableOpsProvider implements
IcebergTableOpsProvider {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ConfigBasedIcebergTableOpsProvider.class);
+
+ public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME =
"config-based-provider";
+
+ @VisibleForTesting Map<String, IcebergConfig> catalogConfigs;
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ this.catalogConfigs =
+ properties.keySet().stream()
+ .map(this::getCatalogName)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .distinct()
+ .collect(
+ Collectors.toMap(
+ catalogName -> catalogName,
+ catalogName ->
+ new IcebergConfig(
+ MapUtils.getPrefixMap(
+ properties, String.format("catalog.%s.",
catalogName)))));
+ this.catalogConfigs.put(
+ IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new
IcebergConfig(properties));
+ }
+
+ @Override
+ public IcebergTableOps getIcebergTableOps(String catalogName) {
+ IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
+ if (icebergConfig == null) {
+ String errorMsg = String.format("%s can not match any catalog",
catalogName);
+ LOG.warn(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ return new IcebergTableOps(icebergConfig);
+ }
+
+ private Optional<String> getCatalogName(String catalogConfigKey) {
+ if (!catalogConfigKey.startsWith("catalog.")) {
+ return Optional.empty();
+ }
+ // The catalogConfigKey's format is catalog.<catalog_name>.<param_name>
+ if (catalogConfigKey.split("\\.").length < 3) {
+ LOG.warn("{} format is illegal", catalogConfigKey);
+ return Optional.empty();
+ }
+ return Optional.of(catalogConfigKey.split("\\.")[1]);
+ }
+}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
new file mode 100644
index 000000000..7b6270be6
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergTableOpsManager implements AutoCloseable {
+ public static final Logger LOG =
LoggerFactory.getLogger(IcebergTableOpsManager.class);
+
+ private static final ImmutableMap<String, String>
ICEBERG_TABLE_OPS_PROVIDER_NAMES =
+ ImmutableMap.of(
+
ConfigBasedIcebergTableOpsProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME,
+ ConfigBasedIcebergTableOpsProvider.class.getCanonicalName());
+
+ private final Cache<String, IcebergTableOps> icebergTableOpsCache;
+
+ private final IcebergTableOpsProvider provider;
+
+ public IcebergTableOpsManager(Map<String, String> properties) {
+ this.icebergTableOpsCache = Caffeine.newBuilder().build();
+ this.provider = createProvider(properties);
+ this.provider.initialize(properties);
+ }
+
+ /**
+ * @param rawPrefix The path parameter is passed by a Jetty handler. The
pattern is matching
+ * ([^/]*\/), end with /
+ * @return the instance of IcebergTableOps.
+ */
+ public IcebergTableOps getOps(String rawPrefix) {
+ String catalogName = getCatalogName(rawPrefix);
+ return icebergTableOpsCache.get(catalogName, k ->
provider.getIcebergTableOps(catalogName));
+ }
+
+ private String getCatalogName(String rawPrefix) {
+ String prefix = shelling(rawPrefix);
+ Preconditions.checkArgument(
+ !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
+ String.format("%s is conflict with reserved key, please replace it",
prefix));
+ if (StringUtils.isBlank(prefix)) {
+ return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+ }
+ return prefix;
+ }
+
+ private IcebergTableOpsProvider createProvider(Map<String, String>
properties) {
+ String providerName =
+ (new
IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER);
+ String className =
ICEBERG_TABLE_OPS_PROVIDER_NAMES.getOrDefault(providerName, providerName);
+ LOG.info("Load Iceberg catalog provider: {}.", className);
+ try {
+ Class<?> providerClz = Class.forName(className);
+ return (IcebergTableOpsProvider)
providerClz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String shelling(String rawPrefix) {
+ if (StringUtils.isBlank(rawPrefix)) {
+ return rawPrefix;
+ } else {
+ // rawPrefix is a string matching ([^/]*/) which end with /
+ Preconditions.checkArgument(
+ rawPrefix.endsWith("/"), String.format("rawPrefix %s format is
illegal", rawPrefix));
+ return rawPrefix.substring(0, rawPrefix.length() - 1);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ icebergTableOpsCache.invalidateAll();
+ }
+}
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
new file mode 100644
index 000000000..cda5ac20a
--- /dev/null
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import java.util.Map;
+
+/**
+ * IcebergTableOpsProvider is an interface defining how Iceberg REST catalog
server gets Iceberg
+ * catalogs.
+ */
+public interface IcebergTableOpsProvider {
+
+ /**
+ * @param properties The parameters for creating Provider which from
configurations whose prefix
+ * is 'gravitino.iceberg-rest.'
+ */
+ void initialize(Map<String, String> properties);
+
+ /**
+ * @param catalogName a param send by clients.
+ * @return the instance of IcebergTableOps.
+ */
+ IcebergTableOps getIcebergTableOps(String catalogName);
+}
diff --git
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
new file mode 100644
index 000000000..c3c8c3fe8
--- /dev/null
+++
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestConfigBasedIcebergTableOpsProvider {
+ private static final String STORE_PATH =
+ "/tmp/gravitino_test_iceberg_jdbc_backend_" +
UUID.randomUUID().toString().replace("-", "");
+
+ @Test
+ public void testValidIcebergTableOps() {
+ String hiveCatalogName = "hive_backend";
+ String jdbcCatalogName = "jdbc_backend";
+ String defaultCatalogName = IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+
+ Map<String, String> config = Maps.newHashMap();
+ // hive backend catalog
+ config.put("catalog.hive_backend.catalog-backend-name", hiveCatalogName);
+ config.put("catalog.hive_backend.catalog-backend", "hive");
+ config.put("catalog.hive_backend.uri", "thrift://127.0.0.1:9083");
+ config.put("catalog.hive_backend.warehouse", "/tmp/usr/hive/warehouse");
+ // jdbc backend catalog
+ config.put("catalog.jdbc_backend.catalog-backend-name", jdbcCatalogName);
+ config.put("catalog.jdbc_backend.catalog-backend", "jdbc");
+ config.put(
+ "catalog.jdbc_backend.uri",
+ String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH));
+ config.put("catalog.jdbc_backend.warehouse", "/tmp/usr/jdbc/warehouse");
+ config.put("catalog.jdbc_backend.jdbc.password", "gravitino");
+ config.put("catalog.jdbc_backend.jdbc.user", "gravitino");
+ config.put("catalog.jdbc_backend.jdbc-driver", "org.h2.Driver");
+ config.put("catalog.jdbc_backend.jdbc-initialize", "true");
+ // default catalog
+ config.put("catalog-backend-name", defaultCatalogName);
+ config.put("catalog-backend", "memory");
+ config.put("warehouse", "/tmp/");
+
+ ConfigBasedIcebergTableOpsProvider provider = new
ConfigBasedIcebergTableOpsProvider();
+ provider.initialize(config);
+
+ IcebergConfig hiveIcebergConfig =
provider.catalogConfigs.get(hiveCatalogName);
+ IcebergConfig jdbcIcebergConfig =
provider.catalogConfigs.get(jdbcCatalogName);
+ IcebergConfig defaultIcebergConfig =
provider.catalogConfigs.get(defaultCatalogName);
+ IcebergTableOps hiveOps = provider.getIcebergTableOps(hiveCatalogName);
+ IcebergTableOps jdbcOps = provider.getIcebergTableOps(jdbcCatalogName);
+ IcebergTableOps defaultOps =
provider.getIcebergTableOps(defaultCatalogName);
+
+ Assertions.assertEquals(
+ hiveCatalogName,
hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+ Assertions.assertEquals("hive",
hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+ Assertions.assertEquals(
+ "thrift://127.0.0.1:9083",
hiveIcebergConfig.get(IcebergConfig.CATALOG_URI));
+ Assertions.assertEquals(
+ "/tmp/usr/hive/warehouse",
hiveIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+
+ Assertions.assertEquals(
+ jdbcCatalogName,
jdbcIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+ Assertions.assertEquals("jdbc",
jdbcIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+ Assertions.assertEquals(
+ "/tmp/usr/jdbc/warehouse",
jdbcIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+ Assertions.assertEquals("org.h2.Driver",
jdbcIcebergConfig.get(IcebergConfig.JDBC_DRIVER));
+ Assertions.assertEquals(true,
jdbcIcebergConfig.get(IcebergConfig.JDBC_INIT_TABLES));
+
+ Assertions.assertEquals(
+ defaultCatalogName,
defaultIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+ Assertions.assertEquals("memory",
defaultIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+ Assertions.assertEquals("/tmp/",
defaultIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+
+ Assertions.assertEquals(hiveCatalogName, hiveOps.catalog.name());
+ Assertions.assertEquals(jdbcCatalogName, jdbcOps.catalog.name());
+ Assertions.assertEquals(defaultCatalogName, defaultOps.catalog.name());
+
+ Assertions.assertTrue(hiveOps.catalog instanceof HiveCatalog);
+ Assertions.assertTrue(jdbcOps.catalog instanceof JdbcCatalog);
+ Assertions.assertTrue(defaultOps.catalog instanceof InMemoryCatalog);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "not_match"})
+ public void testInvalidIcebergTableOps(String catalogName) {
+ ConfigBasedIcebergTableOpsProvider provider = new
ConfigBasedIcebergTableOpsProvider();
+ provider.initialize(Maps.newHashMap());
+
+ Assertions.assertThrowsExactly(
+ RuntimeException.class, () ->
provider.getIcebergTableOps(catalogName));
+ }
+}
diff --git
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
new file mode 100644
index 000000000..04ded71dd
--- /dev/null
+++
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestIcebergTableOpsManager {
+
+ private static final String DEFAULT_CATALOG = "memory";
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/",
"[_~/"})
+ public void testValidGetOps(String rawPrefix) {
+ String prefix = rawPrefix;
+ if (!StringUtils.isBlank(rawPrefix)) {
+ prefix = rawPrefix.substring(0, rawPrefix.length() - 1);
+ }
+ Map<String, String> config = Maps.newHashMap();
+ config.put(String.format("catalog.%s.catalog-backend-name", prefix),
prefix);
+ IcebergTableOpsManager manager = new IcebergTableOpsManager(config);
+
+ IcebergTableOps ops = manager.getOps(rawPrefix);
+
+ if (StringUtils.isBlank(prefix)) {
+ Assertions.assertEquals(ops.catalog.name(), DEFAULT_CATALOG);
+ } else {
+ Assertions.assertEquals(ops.catalog.name(), prefix);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(
+ strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~",
"__gravitino_default_catalog/"})
+ public void testInvalidGetOps(String rawPrefix) {
+ Map<String, String> config = Maps.newHashMap();
+ IcebergTableOpsManager manager = new IcebergTableOpsManager(config);
+
+ Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
manager.getOps(rawPrefix));
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 0a1bf752d..dd3b9d9ab 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -23,7 +23,7 @@ import javax.servlet.Servlet;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.auxiliary.GravitinoAuxiliaryService;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -48,7 +48,7 @@ public class RESTService implements GravitinoAuxiliaryService
{
public static final String SERVICE_NAME = "iceberg-rest";
public static final String ICEBERG_SPEC = "/iceberg/*";
- private IcebergTableOps icebergTableOps;
+ private IcebergTableOpsManager icebergTableOpsManager;
private IcebergMetricsManager icebergMetricsManager;
private void initServer(IcebergConfig icebergConfig) {
@@ -66,13 +66,13 @@ public class RESTService implements
GravitinoAuxiliaryService {
new
HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config,
server);
metricsSystem.register(httpServerMetricsSource);
- icebergTableOps = new IcebergTableOps(icebergConfig);
+ icebergTableOpsManager = new
IcebergTableOpsManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
config.register(
new AbstractBinder() {
@Override
protected void configure() {
- bind(icebergTableOps).to(IcebergTableOps.class).ranked(1);
+
bind(icebergTableOpsManager).to(IcebergTableOpsManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
}
});
@@ -114,8 +114,8 @@ public class RESTService implements
GravitinoAuxiliaryService {
server.stop();
LOG.info("Iceberg REST service stopped");
}
- if (icebergTableOps != null) {
- icebergTableOps.close();
+ if (icebergTableOpsManager != null) {
+ icebergTableOpsManager.close();
}
if (icebergMetricsManager != null) {
icebergMetricsManager.close();
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
index b37fab80b..82b6250e6 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
@@ -34,7 +34,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.catalog.Namespace;
@@ -57,25 +57,27 @@ public class IcebergNamespaceOperations {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergNamespaceOperations.class);
- private IcebergTableOps icebergTableOps;
+ private IcebergTableOpsManager icebergTableOpsManager;
@SuppressWarnings("UnusedVariable")
@Context
private HttpServletRequest httpRequest;
@Inject
- public IcebergNamespaceOperations(IcebergTableOps icebergTableOps) {
- this.icebergTableOps = icebergTableOps;
+ public IcebergNamespaceOperations(IcebergTableOpsManager
icebergTableOpsManager) {
+ this.icebergTableOpsManager = icebergTableOpsManager;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "list-namespace." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "list-namespace", absolute = true)
- public Response listNamespaces(@DefaultValue("") @QueryParam("parent")
String parent) {
+ public Response listNamespaces(
+ @DefaultValue("") @QueryParam("parent") String parent,
@PathParam("prefix") String prefix) {
Namespace parentNamespace =
parent.isEmpty() ? Namespace.empty() :
RESTUtil.decodeNamespace(parent);
- ListNamespacesResponse response =
icebergTableOps.listNamespace(parentNamespace);
+ ListNamespacesResponse response =
+ icebergTableOpsManager.getOps(prefix).listNamespace(parentNamespace);
return IcebergRestUtils.ok(response);
}
@@ -84,9 +86,10 @@ public class IcebergNamespaceOperations {
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "load-namespace." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "load-namespace", absolute = true)
- public Response loadNamespace(@PathParam("namespace") String namespace) {
+ public Response loadNamespace(
+ @PathParam("prefix") String prefix, @PathParam("namespace") String
namespace) {
GetNamespaceResponse getNamespaceResponse =
- icebergTableOps.loadNamespace(RESTUtil.decodeNamespace(namespace));
+
icebergTableOpsManager.getOps(prefix).loadNamespace(RESTUtil.decodeNamespace(namespace));
return IcebergRestUtils.ok(getNamespaceResponse);
}
@@ -95,10 +98,11 @@ public class IcebergNamespaceOperations {
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "drop-namespace." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "drop-namespace", absolute = true)
- public Response dropNamespace(@PathParam("namespace") String namespace) {
+ public Response dropNamespace(
+ @PathParam("prefix") String prefix, @PathParam("namespace") String
namespace) {
// todo check if table exists in namespace after table ops is added
- LOG.info("Drop Iceberg namespace: {}", namespace);
- icebergTableOps.dropNamespace(RESTUtil.decodeNamespace(namespace));
+ LOG.info("Drop Iceberg namespace: {}, prefix: {}", namespace, prefix);
+
icebergTableOpsManager.getOps(prefix).dropNamespace(RESTUtil.decodeNamespace(namespace));
return IcebergRestUtils.noContent();
}
@@ -106,9 +110,11 @@ public class IcebergNamespaceOperations {
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "create-namespace." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "create-namespace", absolute = true)
- public Response createNamespace(CreateNamespaceRequest namespaceRequest) {
- LOG.info("Create Iceberg namespace: {}", namespaceRequest);
- CreateNamespaceResponse response =
icebergTableOps.createNamespace(namespaceRequest);
+ public Response createNamespace(
+ @PathParam("prefix") String prefix, CreateNamespaceRequest
namespaceRequest) {
+ LOG.info("Create Iceberg namespace: {}, prefix: {}", namespaceRequest,
prefix);
+ CreateNamespaceResponse response =
+
icebergTableOpsManager.getOps(prefix).createNamespace(namespaceRequest);
return IcebergRestUtils.ok(response);
}
@@ -118,10 +124,14 @@ public class IcebergNamespaceOperations {
@Timed(name = "update-namespace." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "update-namespace", absolute = true)
public Response updateNamespace(
- @PathParam("namespace") String namespace,
UpdateNamespacePropertiesRequest request) {
- LOG.info("Update Iceberg namespace: {}, request: {}", namespace, request);
+ @PathParam("prefix") String prefix,
+ @PathParam("namespace") String namespace,
+ UpdateNamespacePropertiesRequest request) {
+ LOG.info("Update Iceberg namespace: {}, request: {}, prefix: {}",
namespace, request, prefix);
UpdateNamespacePropertiesResponse response =
-
icebergTableOps.updateNamespaceProperties(RESTUtil.decodeNamespace(namespace),
request);
+ icebergTableOpsManager
+ .getOps(prefix)
+ .updateNamespaceProperties(RESTUtil.decodeNamespace(namespace),
request);
return IcebergRestUtils.ok(response);
}
@@ -131,10 +141,14 @@ public class IcebergNamespaceOperations {
@Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "register-table", absolute = true)
public Response registerTable(
- @PathParam("namespace") String namespace, RegisterTableRequest request) {
+ @PathParam("prefix") String prefix,
+ @PathParam("namespace") String namespace,
+ RegisterTableRequest request) {
LOG.info("Register table, namespace: {}, request: {}", namespace, request);
LoadTableResponse response =
- icebergTableOps.registerTable(RESTUtil.decodeNamespace(namespace),
request);
+ icebergTableOpsManager
+ .getOps(prefix)
+ .registerTable(RESTUtil.decodeNamespace(namespace), request);
return IcebergRestUtils.ok(response);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 0b35f45c7..f6f6042a5 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -37,7 +37,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -57,7 +57,7 @@ public class IcebergTableOperations {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergTableOperations.class);
- private IcebergTableOps icebergTableOps;
+ private IcebergTableOpsManager icebergTableOpsManager;
private IcebergMetricsManager icebergMetricsManager;
private ObjectMapper icebergObjectMapper;
@@ -68,8 +68,8 @@ public class IcebergTableOperations {
@Inject
public IcebergTableOperations(
- IcebergTableOps icebergTableOps, IcebergMetricsManager
icebergMetricsManager) {
- this.icebergTableOps = icebergTableOps;
+ IcebergTableOpsManager icebergTableOpsManager, IcebergMetricsManager
icebergMetricsManager) {
+ this.icebergTableOpsManager = icebergTableOpsManager;
this.icebergObjectMapper = IcebergObjectMapper.getInstance();
this.icebergMetricsManager = icebergMetricsManager;
}
@@ -78,8 +78,10 @@ public class IcebergTableOperations {
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "list-table." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "list-table", absolute = true)
- public Response listTable(@PathParam("namespace") String namespace) {
- return
IcebergRestUtils.ok(icebergTableOps.listTable(RESTUtil.decodeNamespace(namespace)));
+ public Response listTable(
+ @PathParam("prefix") String prefix, @PathParam("namespace") String
namespace) {
+ return IcebergRestUtils.ok(
+
icebergTableOpsManager.getOps(prefix).listTable(RESTUtil.decodeNamespace(namespace)));
}
@POST
@@ -87,13 +89,17 @@ public class IcebergTableOperations {
@Timed(name = "create-table." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
@ResponseMetered(name = "create-table", absolute = true)
public Response createTable(
- @PathParam("namespace") String namespace, CreateTableRequest
createTableRequest) {
+ @PathParam("prefix") String prefix,
+ @PathParam("namespace") String namespace,
+ CreateTableRequest createTableRequest) {
LOG.info(
"Create Iceberg table, namespace: {}, create table request: {}",
namespace,
createTableRequest);
return IcebergRestUtils.ok(
- icebergTableOps.createTable(RESTUtil.decodeNamespace(namespace),
createTableRequest));
+ icebergTableOpsManager
+ .getOps(prefix)
+ .createTable(RESTUtil.decodeNamespace(namespace),
createTableRequest));
}
@POST
@@ -102,6 +108,7 @@ public class IcebergTableOperations {
@Timed(name = "update-table." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
@ResponseMetered(name = "update-table", absolute = true)
public Response updateTable(
+ @PathParam("prefix") String prefix,
@PathParam("namespace") String namespace,
@PathParam("table") String table,
UpdateTableRequest updateTableRequest) {
@@ -114,7 +121,8 @@ public class IcebergTableOperations {
}
TableIdentifier tableIdentifier =
TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
- return IcebergRestUtils.ok(icebergTableOps.updateTable(tableIdentifier,
updateTableRequest));
+ return IcebergRestUtils.ok(
+ icebergTableOpsManager.getOps(prefix).updateTable(tableIdentifier,
updateTableRequest));
}
@DELETE
@@ -123,6 +131,7 @@ public class IcebergTableOperations {
@Timed(name = "drop-table." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "drop-table", absolute = true)
public Response dropTable(
+ @PathParam("prefix") String prefix,
@PathParam("namespace") String namespace,
@PathParam("table") String table,
@DefaultValue("false") @QueryParam("purgeRequested") boolean
purgeRequested) {
@@ -134,9 +143,9 @@ public class IcebergTableOperations {
TableIdentifier tableIdentifier =
TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
if (purgeRequested) {
- icebergTableOps.purgeTable(tableIdentifier);
+ icebergTableOpsManager.getOps(prefix).purgeTable(tableIdentifier);
} else {
- icebergTableOps.dropTable(tableIdentifier);
+ icebergTableOpsManager.getOps(prefix).dropTable(tableIdentifier);
}
return IcebergRestUtils.noContent();
}
@@ -147,13 +156,14 @@ public class IcebergTableOperations {
@Timed(name = "load-table." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
@ResponseMetered(name = "load-table", absolute = true)
public Response loadTable(
+ @PathParam("prefix") String prefix,
@PathParam("namespace") String namespace,
@PathParam("table") String table,
@DefaultValue("all") @QueryParam("snapshots") String snapshots) {
// todo support snapshots
TableIdentifier tableIdentifier =
TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
- return IcebergRestUtils.ok(icebergTableOps.loadTable(tableIdentifier));
+ return
IcebergRestUtils.ok(icebergTableOpsManager.getOps(prefix).loadTable(tableIdentifier));
}
@HEAD
@@ -162,10 +172,12 @@ public class IcebergTableOperations {
@Timed(name = "table-exists." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
@ResponseMetered(name = "table-exits", absolute = true)
public Response tableExists(
- @PathParam("namespace") String namespace, @PathParam("table") String
table) {
+ @PathParam("prefix") String prefix,
+ @PathParam("namespace") String namespace,
+ @PathParam("table") String table) {
TableIdentifier tableIdentifier =
TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
- if (icebergTableOps.tableExists(tableIdentifier)) {
+ if (icebergTableOpsManager.getOps(prefix).tableExists(tableIdentifier)) {
return IcebergRestUtils.okWithoutContent();
} else {
return IcebergRestUtils.notExists();
@@ -178,6 +190,7 @@ public class IcebergTableOperations {
@Timed(name = "report-table-metrics." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "report-table-metrics", absolute = true)
public Response reportTableMetrics(
+ @PathParam("prefix") String prefix,
@PathParam("namespace") String namespace,
@PathParam("table") String table,
ReportMetricsRequest request) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
index 2b8585ba6..441ce0551 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
@@ -25,11 +25,12 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -43,19 +44,20 @@ public class IcebergTableRenameOperations {
@Context
private HttpServletRequest httpRequest;
- private IcebergTableOps icebergTableOps;
+ private IcebergTableOpsManager icebergTableOpsManager;
@Inject
- public IcebergTableRenameOperations(IcebergTableOps icebergTableOps) {
- this.icebergTableOps = icebergTableOps;
+ public IcebergTableRenameOperations(IcebergTableOpsManager
icebergTableOpsManager) {
+ this.icebergTableOpsManager = icebergTableOpsManager;
}
@POST
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "rename-table." + MetricNames.HTTP_PROCESS_DURATION, absolute
= true)
@ResponseMetered(name = "rename-table", absolute = true)
- public Response renameTable(RenameTableRequest renameTableRequest) {
- icebergTableOps.renameTable(renameTableRequest);
+ public Response renameTable(
+ @PathParam("prefix") String prefix, RenameTableRequest
renameTableRequest) {
+ icebergTableOpsManager.getOps(prefix).renameTable(renameTableRequest);
return IcebergRestUtils.okWithoutContent();
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
index 6e2aa5c28..c70888d7b 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
@@ -123,7 +123,6 @@ public abstract class IcebergRESTServerManager {
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
configMap.putAll(customConfigs);
-
ITUtils.rewriteConfigFile(configTempFileName, configFileName, configMap);
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
new file mode 100644
index 000000000..6a9375615
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.rest;
+
+import
org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+
+public class ConfigBasedIcebergTableOpsProviderForTest extends
ConfigBasedIcebergTableOpsProvider {
+ @Override
+ public IcebergTableOps getIcebergTableOps(String prefix) {
+ return new IcebergTableOpsForTest();
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 0e3d29e49..e7b19e0af 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -19,10 +19,13 @@
package org.apache.gravitino.iceberg.service.rest;
+import com.google.common.collect.Maps;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -66,13 +69,19 @@ public class IcebergRestTestUtil {
}
if (bindIcebergTableOps) {
- IcebergTableOps icebergTableOps = new IcebergTableOpsForTest();
+ Map<String, String> catalogConf = Maps.newHashMap();
+ catalogConf.put(String.format("catalog.%s.catalog-backend-name",
PREFIX), PREFIX);
+ catalogConf.put(
+ IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
+ ConfigBasedIcebergTableOpsProviderForTest.class.getName());
+ IcebergTableOpsManager icebergTableOpsManager = new
IcebergTableOpsManager(catalogConf);
+
IcebergMetricsManager icebergMetricsManager = new
IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
new AbstractBinder() {
@Override
protected void configure() {
- bind(icebergTableOps).to(IcebergTableOps.class).ranked(2);
+
bind(icebergTableOpsManager).to(IcebergTableOpsManager.class).ranked(2);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2);
}
});
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
index ee672bf09..7d1d80b54 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
@@ -27,6 +27,7 @@ import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
@@ -38,7 +39,7 @@ public class IcebergTestBase extends JerseyTest {
return new ResourceConfig();
}
- private boolean urlPathWithPrefix = false;
+ private String urlPathPrefix = "";
public Invocation.Builder getRenameTableClientBuilder() {
return getIcebergClientBuilder(IcebergRestTestUtil.RENAME_TABLE_PATH,
Optional.empty());
@@ -109,8 +110,8 @@ public class IcebergTestBase extends JerseyTest {
public Invocation.Builder getIcebergClientBuilder(
String path, Optional<Map<String, String>> queryParam) {
- if (urlPathWithPrefix) {
- path = injectPrefixToPath(path, IcebergRestTestUtil.PREFIX);
+ if (!StringUtils.isBlank(urlPathPrefix)) {
+ path = injectPrefixToPath(path, urlPathPrefix);
}
WebTarget target = target(path);
if (queryParam.isPresent()) {
@@ -126,7 +127,7 @@ public class IcebergTestBase extends JerseyTest {
.accept(MediaType.APPLICATION_JSON_TYPE);
}
- public void setUrlPathWithPrefix(boolean urlPathWithPrefix) {
- this.urlPathWithPrefix = urlPathWithPrefix;
+ public void setUrlPathWithPrefix(String urlPathPrefix) {
+ this.urlPathPrefix = urlPathPrefix;
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
index b2f9e09ef..1116fc0bb 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
@@ -32,13 +32,13 @@ public class TestIcebergConfig extends IcebergTestBase {
@Override
protected Application configure() {
- return
IcebergRestTestUtil.getIcebergResourceConfig(IcebergConfigOperations.class,
false);
+ return
IcebergRestTestUtil.getIcebergResourceConfig(IcebergConfigOperations.class);
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testConfig(boolean withPrefix) {
- setUrlPathWithPrefix(withPrefix);
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ public void testConfig(String prefix) {
+ setUrlPathWithPrefix(prefix);
Response resp = getConfigClientBuilder().get();
Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
index 251f9e907..6049a153e 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
@@ -209,9 +209,9 @@ public class TestIcebergNamespaceOperations extends
IcebergTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testListNamespace(boolean withPrefix) {
- setUrlPathWithPrefix(withPrefix);
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ void testListNamespace(String prefix) {
+ setUrlPathWithPrefix(prefix);
dropAllExistingNamespace();
verifyListNamespaceSucc(Optional.empty(), Arrays.asList());
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
index e2d49d985..6037302b8 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
@@ -260,9 +260,9 @@ public class TestIcebergTableOperations extends
TestIcebergNamespaceOperations {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testListTables(boolean withPrefix) {
- setUrlPathWithPrefix(withPrefix);
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ void testListTables(String prefix) {
+ setUrlPathWithPrefix(prefix);
verifyListTableFail(404);
verifyCreateNamespaceSucc(IcebergRestTestUtil.TEST_NAMESPACE_NAME);
@@ -283,9 +283,9 @@ public class TestIcebergTableOperations extends
TestIcebergNamespaceOperations {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testRenameTable(boolean withPrefix) {
- setUrlPathWithPrefix(withPrefix);
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ void testRenameTable(String prefix) {
+ setUrlPathWithPrefix(prefix);
// namespace not exits
verifyRenameTableFail("rename_foo1", "rename_foo3", 404);