This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 56fc27a10  [4176]feat(iceberg) support multiple catalogs in the 
iceberg rest catalog server (#4273)
56fc27a10 is described below

commit 56fc27a10a8b2e0460f37e1ebf218eab77bdda75
Author: theoryxu <[email protected]>
AuthorDate: Tue Aug 13 18:54:46 2024 +0800

     [4176]feat(iceberg) support multiple catalogs in the iceberg rest catalog 
server (#4273)
    
    ### What changes were proposed in this pull request?
    
    design doc:
    
https://docs.google.com/document/d/1_XwsVHAzUjPCyOnJml69VkzhKI9veFvMo5OzGcKCEAk/edit
    
    use config based catalog provider to manage multi catalogs
    
    ### Why are the changes needed?
    
    Fix: #4176
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    1. Manually test
    4. Add UT
    
    ---------
    
    Co-authored-by: theoryxu <[email protected]>
---
 .../lakehouse/iceberg/IcebergConstants.java        |   4 +
 .../apache/gravitino/config/ConfigConstants.java   |   3 +
 docs/iceberg-rest-service.md                       |  45 ++++++++
 .../gravitino/iceberg/common/IcebergConfig.java    |   8 ++
 .../ops/ConfigBasedIcebergTableOpsProvider.java    |  90 ++++++++++++++++
 .../iceberg/common/ops/IcebergTableOpsManager.java |  99 ++++++++++++++++++
 .../common/ops/IcebergTableOpsProvider.java        |  40 +++++++
 .../TestConfigBasedIcebergTableOpsProvider.java    | 115 +++++++++++++++++++++
 .../common/ops/TestIcebergTableOpsManager.java     |  61 +++++++++++
 .../org/apache/gravitino/iceberg/RESTService.java  |  12 +--
 .../service/rest/IcebergNamespaceOperations.java   |  52 ++++++----
 .../service/rest/IcebergTableOperations.java       |  41 +++++---
 .../service/rest/IcebergTableRenameOperations.java |  14 +--
 .../test/util/IcebergRESTServerManager.java        |   1 -
 .../ConfigBasedIcebergTableOpsProviderForTest.java |  29 ++++++
 .../iceberg/service/rest/IcebergRestTestUtil.java  |  15 ++-
 .../iceberg/service/rest/IcebergTestBase.java      |  11 +-
 .../iceberg/service/rest/TestIcebergConfig.java    |   8 +-
 .../rest/TestIcebergNamespaceOperations.java       |   6 +-
 .../service/rest/TestIcebergTableOperations.java   |  12 +--
 20 files changed, 599 insertions(+), 67 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 4a1f46fd6..6e4aae37a 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -63,4 +63,8 @@ public class IcebergConstants {
   public static final String ICEBERG_METRICS_QUEUE_CAPACITY = 
"metricsQueueCapacity";
 
   public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = 
"iceberg-rest";
+
+  public static final String ICEBERG_REST_CATALOG_PROVIDER = 
"catalog-provider";
+
+  public static final String GRAVITINO_DEFAULT_CATALOG = 
"__gravitino_default_catalog";
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java 
b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index fd6efc70f..5317b19a4 100644
--- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -59,4 +59,7 @@ public final class ConfigConstants {
 
   /** The version number for the 0.6.0 release. */
   public static final String VERSION_0_6_0 = "0.6.0";
+
+  /** The version number for the 0.7.0 release. */
+  public static final String VERSION_0_7_0 = "0.7.0";
 }
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 6ae17f74b..c37ea39ac 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -147,6 +147,51 @@ If you have a JDBC Iceberg catalog prior, you must set 
`catalog-backend-name` to
 You must download the corresponding JDBC driver to the 
`iceberg-rest-server/libs` directory.
 :::
 
+#### Multi catalog support
+
+The Gravitino Iceberg REST server supports multiple catalogs and offers a 
configuration-based catalog management system.
+
+| Configuration item                           | Description                   
                                                                                
                                                                       | 
Default value               | Required | Since Version |
+|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|----------|---------------|
+| `gravitino.iceberg-rest.catalog-provider`    | Catalog provider class name, 
you can develop a class that implements `IcebergTableOpsProvider` and add the 
corresponding jar file to the Iceberg REST service classpath directory.   | 
`config-based-provider`     | No       | 0.7.0         |
+
+When using a config-based catalog provider, you can configure the default 
catalog with `gravitino.iceberg-rest.catalog.<param name>=<value>`. For 
specific catalogs, use the format `gravitino.iceberg-rest.catalog.<catalog 
name>.<param name>=<value>`.
+
+For instance, you can configure three different catalogs, the default catalog 
and the specific `hive_backend` and `jdbc_backend` catalogs separately.
+
+```text
+gravitino.iceberg-rest.catalog-backend = jdbc
+gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432
+gravitino.iceberg-rest.warehouse = 
hdfs://127.0.0.1:9000/user/hive/warehouse-postgresql
+...
+gravitino.iceberg-rest.catalog.hive_backend.catalog-backend = hive
+gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9084
+gravitino.iceberg-rest.catalog.hive_backend.warehouse = 
/user/hive/warehouse-hive/
+...
+gravitino.iceberg-rest.catalog.jdbc_backend.catalog-backend = jdbc
+gravitino.iceberg-rest.catalog.jdbc_backend.uri = jdbc:mysql://127.0.0.1:3306/
+gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = 
hdfs://127.0.0.1:9000/user/hive/warehouse-mysql
+...
+```
+
+You can access different catalogs by setting the `prefix` to the specific 
catalog name in the Iceberg REST client configuration. The default catalog will 
be used if you do not specify a `prefix`. For instance, consider the case of 
SparkSQL.
+
+```shell
+./bin/spark-sql -v \
+...
+--conf spark.sql.catalog.default_rest_catalog.type=rest  \
+--conf 
spark.sql.catalog.default_rest_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+...
+--conf spark.sql.catalog.hive_backend_catalog.type=rest  \
+--conf 
spark.sql.catalog.hive_backend_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+--conf spark.sql.catalog.hive_backend_catalog.prefix=hive_backend \
+...
+--conf spark.sql.catalog.jdbc_backend_catalog.type=rest  \
+--conf 
spark.sql.catalog.jdbc_backend_catalog.uri=http://127.0.0.1:9001/iceberg/ \
+--conf spark.sql.catalog.jdbc_backend_catalog.prefix=jdbc_backend \
+...
+```
+
 ### Other Apache Iceberg catalog properties
 
 You can add other properties defined in [Iceberg catalog 
properties](https://iceberg.apache.org/docs/1.5.2/configuration/#catalog-properties).
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index 8bf9b86d0..b75fc88d6 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -158,6 +158,14 @@ public class IcebergConfig extends Config implements 
OverwriteDefaultConfig {
           .stringConf()
           .create();
 
+  public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
+      new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
+          .doc(
+              "Catalog provider class name, you can develop a class that 
implements `IcebergTableOpsProvider` and add the corresponding jar file to the 
Iceberg REST service classpath directory.")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .createWithDefault("config-based-provider");
+
   public String getJdbcDriver() {
     return get(JDBC_DRIVER);
   }
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
new file mode 100644
index 000000000..070e67ce1
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This provider use configs to support multiple catalogs.
+ *
+ * <p>For example, there are two different catalogs: jdbc_proxy, hive_proxy 
The config is like:
+ *
+ * <p>gravitino.iceberg-rest.catalog.jdbc_proxy.catalog-backend = jdbc
+ * gravitino.iceberg-rest.catalog.jdbc_proxy.uri = 
jdbc:mysql://{host}:{port}/{db} ...
+ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive
+ * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ...
+ */
+public class ConfigBasedIcebergTableOpsProvider implements 
IcebergTableOpsProvider {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ConfigBasedIcebergTableOpsProvider.class);
+
+  public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = 
"config-based-provider";
+
+  @VisibleForTesting Map<String, IcebergConfig> catalogConfigs;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.catalogConfigs =
+        properties.keySet().stream()
+            .map(this::getCatalogName)
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .distinct()
+            .collect(
+                Collectors.toMap(
+                    catalogName -> catalogName,
+                    catalogName ->
+                        new IcebergConfig(
+                            MapUtils.getPrefixMap(
+                                properties, String.format("catalog.%s.", 
catalogName)))));
+    this.catalogConfigs.put(
+        IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new 
IcebergConfig(properties));
+  }
+
+  @Override
+  public IcebergTableOps getIcebergTableOps(String catalogName) {
+    IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
+    if (icebergConfig == null) {
+      String errorMsg = String.format("%s can not match any catalog", 
catalogName);
+      LOG.warn(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    return new IcebergTableOps(icebergConfig);
+  }
+
+  private Optional<String> getCatalogName(String catalogConfigKey) {
+    if (!catalogConfigKey.startsWith("catalog.")) {
+      return Optional.empty();
+    }
+    // The catalogConfigKey's format is catalog.<catalog_name>.<param_name>
+    if (catalogConfigKey.split("\\.").length < 3) {
+      LOG.warn("{} format is illegal", catalogConfigKey);
+      return Optional.empty();
+    }
+    return Optional.of(catalogConfigKey.split("\\.")[1]);
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
new file mode 100644
index 000000000..7b6270be6
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergTableOpsManager implements AutoCloseable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableOpsManager.class);
+
+  private static final ImmutableMap<String, String> 
ICEBERG_TABLE_OPS_PROVIDER_NAMES =
+      ImmutableMap.of(
+          
ConfigBasedIcebergTableOpsProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME,
+          ConfigBasedIcebergTableOpsProvider.class.getCanonicalName());
+
+  private final Cache<String, IcebergTableOps> icebergTableOpsCache;
+
+  private final IcebergTableOpsProvider provider;
+
+  public IcebergTableOpsManager(Map<String, String> properties) {
+    this.icebergTableOpsCache = Caffeine.newBuilder().build();
+    this.provider = createProvider(properties);
+    this.provider.initialize(properties);
+  }
+
+  /**
+   * @param rawPrefix The path parameter is passed by a Jetty handler. The 
pattern is matching
+   *     ([^/]*\/), end with /
+   * @return the instance of IcebergTableOps.
+   */
+  public IcebergTableOps getOps(String rawPrefix) {
+    String catalogName = getCatalogName(rawPrefix);
+    return icebergTableOpsCache.get(catalogName, k -> 
provider.getIcebergTableOps(catalogName));
+  }
+
+  private String getCatalogName(String rawPrefix) {
+    String prefix = shelling(rawPrefix);
+    Preconditions.checkArgument(
+        !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
+        String.format("%s is conflict with reserved key, please replace it", 
prefix));
+    if (StringUtils.isBlank(prefix)) {
+      return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+    }
+    return prefix;
+  }
+
+  private IcebergTableOpsProvider createProvider(Map<String, String> 
properties) {
+    String providerName =
+        (new 
IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER);
+    String className = 
ICEBERG_TABLE_OPS_PROVIDER_NAMES.getOrDefault(providerName, providerName);
+    LOG.info("Load Iceberg catalog provider: {}.", className);
+    try {
+      Class<?> providerClz = Class.forName(className);
+      return (IcebergTableOpsProvider) 
providerClz.getDeclaredConstructor().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String shelling(String rawPrefix) {
+    if (StringUtils.isBlank(rawPrefix)) {
+      return rawPrefix;
+    } else {
+      // rawPrefix is a string matching ([^/]*/) which end with /
+      Preconditions.checkArgument(
+          rawPrefix.endsWith("/"), String.format("rawPrefix %s format is 
illegal", rawPrefix));
+      return rawPrefix.substring(0, rawPrefix.length() - 1);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    icebergTableOpsCache.invalidateAll();
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
new file mode 100644
index 000000000..cda5ac20a
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import java.util.Map;
+
+/**
+ * IcebergTableOpsProvider is an interface defining how Iceberg REST catalog 
server gets Iceberg
+ * catalogs.
+ */
+public interface IcebergTableOpsProvider {
+
+  /**
+   * @param properties The parameters for creating Provider which from 
configurations whose prefix
+   *     is 'gravitino.iceberg-rest.'
+   */
+  void initialize(Map<String, String> properties);
+
+  /**
+   * @param catalogName a param send by clients.
+   * @return the instance of IcebergTableOps.
+   */
+  IcebergTableOps getIcebergTableOps(String catalogName);
+}
diff --git 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
new file mode 100644
index 000000000..c3c8c3fe8
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.jdbc.JdbcCatalog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestConfigBasedIcebergTableOpsProvider {
+  private static final String STORE_PATH =
+      "/tmp/gravitino_test_iceberg_jdbc_backend_" + 
UUID.randomUUID().toString().replace("-", "");
+
+  @Test
+  public void testValidIcebergTableOps() {
+    String hiveCatalogName = "hive_backend";
+    String jdbcCatalogName = "jdbc_backend";
+    String defaultCatalogName = IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+
+    Map<String, String> config = Maps.newHashMap();
+    // hive backend catalog
+    config.put("catalog.hive_backend.catalog-backend-name", hiveCatalogName);
+    config.put("catalog.hive_backend.catalog-backend", "hive");
+    config.put("catalog.hive_backend.uri", "thrift://127.0.0.1:9083");
+    config.put("catalog.hive_backend.warehouse", "/tmp/usr/hive/warehouse");
+    // jdbc backend catalog
+    config.put("catalog.jdbc_backend.catalog-backend-name", jdbcCatalogName);
+    config.put("catalog.jdbc_backend.catalog-backend", "jdbc");
+    config.put(
+        "catalog.jdbc_backend.uri",
+        String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH));
+    config.put("catalog.jdbc_backend.warehouse", "/tmp/usr/jdbc/warehouse");
+    config.put("catalog.jdbc_backend.jdbc.password", "gravitino");
+    config.put("catalog.jdbc_backend.jdbc.user", "gravitino");
+    config.put("catalog.jdbc_backend.jdbc-driver", "org.h2.Driver");
+    config.put("catalog.jdbc_backend.jdbc-initialize", "true");
+    // default catalog
+    config.put("catalog-backend-name", defaultCatalogName);
+    config.put("catalog-backend", "memory");
+    config.put("warehouse", "/tmp/");
+
+    ConfigBasedIcebergTableOpsProvider provider = new 
ConfigBasedIcebergTableOpsProvider();
+    provider.initialize(config);
+
+    IcebergConfig hiveIcebergConfig = 
provider.catalogConfigs.get(hiveCatalogName);
+    IcebergConfig jdbcIcebergConfig = 
provider.catalogConfigs.get(jdbcCatalogName);
+    IcebergConfig defaultIcebergConfig = 
provider.catalogConfigs.get(defaultCatalogName);
+    IcebergTableOps hiveOps = provider.getIcebergTableOps(hiveCatalogName);
+    IcebergTableOps jdbcOps = provider.getIcebergTableOps(jdbcCatalogName);
+    IcebergTableOps defaultOps = 
provider.getIcebergTableOps(defaultCatalogName);
+
+    Assertions.assertEquals(
+        hiveCatalogName, 
hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+    Assertions.assertEquals("hive", 
hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+    Assertions.assertEquals(
+        "thrift://127.0.0.1:9083", 
hiveIcebergConfig.get(IcebergConfig.CATALOG_URI));
+    Assertions.assertEquals(
+        "/tmp/usr/hive/warehouse", 
hiveIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+
+    Assertions.assertEquals(
+        jdbcCatalogName, 
jdbcIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+    Assertions.assertEquals("jdbc", 
jdbcIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+    Assertions.assertEquals(
+        "/tmp/usr/jdbc/warehouse", 
jdbcIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+    Assertions.assertEquals("org.h2.Driver", 
jdbcIcebergConfig.get(IcebergConfig.JDBC_DRIVER));
+    Assertions.assertEquals(true, 
jdbcIcebergConfig.get(IcebergConfig.JDBC_INIT_TABLES));
+
+    Assertions.assertEquals(
+        defaultCatalogName, 
defaultIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
+    Assertions.assertEquals("memory", 
defaultIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
+    Assertions.assertEquals("/tmp/", 
defaultIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));
+
+    Assertions.assertEquals(hiveCatalogName, hiveOps.catalog.name());
+    Assertions.assertEquals(jdbcCatalogName, jdbcOps.catalog.name());
+    Assertions.assertEquals(defaultCatalogName, defaultOps.catalog.name());
+
+    Assertions.assertTrue(hiveOps.catalog instanceof HiveCatalog);
+    Assertions.assertTrue(jdbcOps.catalog instanceof JdbcCatalog);
+    Assertions.assertTrue(defaultOps.catalog instanceof InMemoryCatalog);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"", "not_match"})
+  public void testInvalidIcebergTableOps(String catalogName) {
+    ConfigBasedIcebergTableOpsProvider provider = new 
ConfigBasedIcebergTableOpsProvider();
+    provider.initialize(Maps.newHashMap());
+
+    Assertions.assertThrowsExactly(
+        RuntimeException.class, () -> 
provider.getIcebergTableOps(catalogName));
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
new file mode 100644
index 000000000..04ded71dd
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.common.ops;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestIcebergTableOpsManager {
+
+  private static final String DEFAULT_CATALOG = "memory";
+
+  @ParameterizedTest
+  @ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/", 
"[_~/"})
+  public void testValidGetOps(String rawPrefix) {
+    String prefix = rawPrefix;
+    if (!StringUtils.isBlank(rawPrefix)) {
+      prefix = rawPrefix.substring(0, rawPrefix.length() - 1);
+    }
+    Map<String, String> config = Maps.newHashMap();
+    config.put(String.format("catalog.%s.catalog-backend-name", prefix), 
prefix);
+    IcebergTableOpsManager manager = new IcebergTableOpsManager(config);
+
+    IcebergTableOps ops = manager.getOps(rawPrefix);
+
+    if (StringUtils.isBlank(prefix)) {
+      Assertions.assertEquals(ops.catalog.name(), DEFAULT_CATALOG);
+    } else {
+      Assertions.assertEquals(ops.catalog.name(), prefix);
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(
+      strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", 
"__gravitino_default_catalog/"})
+  public void testInvalidGetOps(String rawPrefix) {
+    Map<String, String> config = Maps.newHashMap();
+    IcebergTableOpsManager manager = new IcebergTableOpsManager(config);
+
+    Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
manager.getOps(rawPrefix));
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 0a1bf752d..dd3b9d9ab 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -23,7 +23,7 @@ import javax.servlet.Servlet;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.auxiliary.GravitinoAuxiliaryService;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
 import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
 import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
 import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -48,7 +48,7 @@ public class RESTService implements GravitinoAuxiliaryService 
{
   public static final String SERVICE_NAME = "iceberg-rest";
   public static final String ICEBERG_SPEC = "/iceberg/*";
 
-  private IcebergTableOps icebergTableOps;
+  private IcebergTableOpsManager icebergTableOpsManager;
   private IcebergMetricsManager icebergMetricsManager;
 
   private void initServer(IcebergConfig icebergConfig) {
@@ -66,13 +66,13 @@ public class RESTService implements 
GravitinoAuxiliaryService {
         new 
HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, 
server);
     metricsSystem.register(httpServerMetricsSource);
 
-    icebergTableOps = new IcebergTableOps(icebergConfig);
+    icebergTableOpsManager = new 
IcebergTableOpsManager(icebergConfig.getAllConfig());
     icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
     config.register(
         new AbstractBinder() {
           @Override
           protected void configure() {
-            bind(icebergTableOps).to(IcebergTableOps.class).ranked(1);
+            
bind(icebergTableOpsManager).to(IcebergTableOpsManager.class).ranked(1);
             
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
           }
         });
@@ -114,8 +114,8 @@ public class RESTService implements 
GravitinoAuxiliaryService {
       server.stop();
       LOG.info("Iceberg REST service stopped");
     }
-    if (icebergTableOps != null) {
-      icebergTableOps.close();
+    if (icebergTableOpsManager != null) {
+      icebergTableOpsManager.close();
     }
     if (icebergMetricsManager != null) {
       icebergMetricsManager.close();
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
index b37fab80b..82b6250e6 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergNamespaceOperations.java
@@ -34,7 +34,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
 import org.apache.gravitino.iceberg.service.IcebergRestUtils;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.iceberg.catalog.Namespace;
@@ -57,25 +57,27 @@ public class IcebergNamespaceOperations {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergNamespaceOperations.class);
 
-  private IcebergTableOps icebergTableOps;
+  private IcebergTableOpsManager icebergTableOpsManager;
 
   @SuppressWarnings("UnusedVariable")
   @Context
   private HttpServletRequest httpRequest;
 
   @Inject
-  public IcebergNamespaceOperations(IcebergTableOps icebergTableOps) {
-    this.icebergTableOps = icebergTableOps;
+  public IcebergNamespaceOperations(IcebergTableOpsManager 
icebergTableOpsManager) {
+    this.icebergTableOpsManager = icebergTableOpsManager;
   }
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "list-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "list-namespace", absolute = true)
-  public Response listNamespaces(@DefaultValue("") @QueryParam("parent") 
String parent) {
+  public Response listNamespaces(
+      @DefaultValue("") @QueryParam("parent") String parent, 
@PathParam("prefix") String prefix) {
     Namespace parentNamespace =
         parent.isEmpty() ? Namespace.empty() : 
RESTUtil.decodeNamespace(parent);
-    ListNamespacesResponse response = 
icebergTableOps.listNamespace(parentNamespace);
+    ListNamespacesResponse response =
+        icebergTableOpsManager.getOps(prefix).listNamespace(parentNamespace);
     return IcebergRestUtils.ok(response);
   }
 
@@ -84,9 +86,10 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "load-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "load-namespace", absolute = true)
-  public Response loadNamespace(@PathParam("namespace") String namespace) {
+  public Response loadNamespace(
+      @PathParam("prefix") String prefix, @PathParam("namespace") String 
namespace) {
     GetNamespaceResponse getNamespaceResponse =
-        icebergTableOps.loadNamespace(RESTUtil.decodeNamespace(namespace));
+        
icebergTableOpsManager.getOps(prefix).loadNamespace(RESTUtil.decodeNamespace(namespace));
     return IcebergRestUtils.ok(getNamespaceResponse);
   }
 
@@ -95,10 +98,11 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "drop-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "drop-namespace", absolute = true)
-  public Response dropNamespace(@PathParam("namespace") String namespace) {
+  public Response dropNamespace(
+      @PathParam("prefix") String prefix, @PathParam("namespace") String 
namespace) {
     // todo check if table exists in namespace after table ops is added
-    LOG.info("Drop Iceberg namespace: {}", namespace);
-    icebergTableOps.dropNamespace(RESTUtil.decodeNamespace(namespace));
+    LOG.info("Drop Iceberg namespace: {}, prefix: {}", namespace, prefix);
+    
icebergTableOpsManager.getOps(prefix).dropNamespace(RESTUtil.decodeNamespace(namespace));
     return IcebergRestUtils.noContent();
   }
 
@@ -106,9 +110,11 @@ public class IcebergNamespaceOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "create-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "create-namespace", absolute = true)
-  public Response createNamespace(CreateNamespaceRequest namespaceRequest) {
-    LOG.info("Create Iceberg namespace: {}", namespaceRequest);
-    CreateNamespaceResponse response = 
icebergTableOps.createNamespace(namespaceRequest);
+  public Response createNamespace(
+      @PathParam("prefix") String prefix, CreateNamespaceRequest 
namespaceRequest) {
+    LOG.info("Create Iceberg namespace: {}, prefix: {}", namespaceRequest, 
prefix);
+    CreateNamespaceResponse response =
+        
icebergTableOpsManager.getOps(prefix).createNamespace(namespaceRequest);
     return IcebergRestUtils.ok(response);
   }
 
@@ -118,10 +124,14 @@ public class IcebergNamespaceOperations {
   @Timed(name = "update-namespace." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "update-namespace", absolute = true)
   public Response updateNamespace(
-      @PathParam("namespace") String namespace, 
UpdateNamespacePropertiesRequest request) {
-    LOG.info("Update Iceberg namespace: {}, request: {}", namespace, request);
+      @PathParam("prefix") String prefix,
+      @PathParam("namespace") String namespace,
+      UpdateNamespacePropertiesRequest request) {
+    LOG.info("Update Iceberg namespace: {}, request: {}, prefix: {}", 
namespace, request, prefix);
     UpdateNamespacePropertiesResponse response =
-        
icebergTableOps.updateNamespaceProperties(RESTUtil.decodeNamespace(namespace), 
request);
+        icebergTableOpsManager
+            .getOps(prefix)
+            .updateNamespaceProperties(RESTUtil.decodeNamespace(namespace), 
request);
     return IcebergRestUtils.ok(response);
   }
 
@@ -131,10 +141,14 @@ public class IcebergNamespaceOperations {
   @Timed(name = "register-table." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "register-table", absolute = true)
   public Response registerTable(
-      @PathParam("namespace") String namespace, RegisterTableRequest request) {
+      @PathParam("prefix") String prefix,
+      @PathParam("namespace") String namespace,
+      RegisterTableRequest request) {
     LOG.info("Register table, namespace: {}, request: {}", namespace, request);
     LoadTableResponse response =
-        icebergTableOps.registerTable(RESTUtil.decodeNamespace(namespace), 
request);
+        icebergTableOpsManager
+            .getOps(prefix)
+            .registerTable(RESTUtil.decodeNamespace(namespace), request);
     return IcebergRestUtils.ok(response);
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 0b35f45c7..f6f6042a5 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -37,7 +37,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
 import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
 import org.apache.gravitino.iceberg.service.IcebergRestUtils;
 import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -57,7 +57,7 @@ public class IcebergTableOperations {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTableOperations.class);
 
-  private IcebergTableOps icebergTableOps;
+  private IcebergTableOpsManager icebergTableOpsManager;
   private IcebergMetricsManager icebergMetricsManager;
 
   private ObjectMapper icebergObjectMapper;
@@ -68,8 +68,8 @@ public class IcebergTableOperations {
 
   @Inject
   public IcebergTableOperations(
-      IcebergTableOps icebergTableOps, IcebergMetricsManager 
icebergMetricsManager) {
-    this.icebergTableOps = icebergTableOps;
+      IcebergTableOpsManager icebergTableOpsManager, IcebergMetricsManager 
icebergMetricsManager) {
+    this.icebergTableOpsManager = icebergTableOpsManager;
     this.icebergObjectMapper = IcebergObjectMapper.getInstance();
     this.icebergMetricsManager = icebergMetricsManager;
   }
@@ -78,8 +78,10 @@ public class IcebergTableOperations {
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "list-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "list-table", absolute = true)
-  public Response listTable(@PathParam("namespace") String namespace) {
-    return 
IcebergRestUtils.ok(icebergTableOps.listTable(RESTUtil.decodeNamespace(namespace)));
+  public Response listTable(
+      @PathParam("prefix") String prefix, @PathParam("namespace") String 
namespace) {
+    return IcebergRestUtils.ok(
+        
icebergTableOpsManager.getOps(prefix).listTable(RESTUtil.decodeNamespace(namespace)));
   }
 
   @POST
@@ -87,13 +89,17 @@ public class IcebergTableOperations {
   @Timed(name = "create-table." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "create-table", absolute = true)
   public Response createTable(
-      @PathParam("namespace") String namespace, CreateTableRequest 
createTableRequest) {
+      @PathParam("prefix") String prefix,
+      @PathParam("namespace") String namespace,
+      CreateTableRequest createTableRequest) {
     LOG.info(
         "Create Iceberg table, namespace: {}, create table request: {}",
         namespace,
         createTableRequest);
     return IcebergRestUtils.ok(
-        icebergTableOps.createTable(RESTUtil.decodeNamespace(namespace), 
createTableRequest));
+        icebergTableOpsManager
+            .getOps(prefix)
+            .createTable(RESTUtil.decodeNamespace(namespace), 
createTableRequest));
   }
 
   @POST
@@ -102,6 +108,7 @@ public class IcebergTableOperations {
   @Timed(name = "update-table." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "update-table", absolute = true)
   public Response updateTable(
+      @PathParam("prefix") String prefix,
       @PathParam("namespace") String namespace,
       @PathParam("table") String table,
       UpdateTableRequest updateTableRequest) {
@@ -114,7 +121,8 @@ public class IcebergTableOperations {
     }
     TableIdentifier tableIdentifier =
         TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
-    return IcebergRestUtils.ok(icebergTableOps.updateTable(tableIdentifier, 
updateTableRequest));
+    return IcebergRestUtils.ok(
+        icebergTableOpsManager.getOps(prefix).updateTable(tableIdentifier, 
updateTableRequest));
   }
 
   @DELETE
@@ -123,6 +131,7 @@ public class IcebergTableOperations {
   @Timed(name = "drop-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "drop-table", absolute = true)
   public Response dropTable(
+      @PathParam("prefix") String prefix,
       @PathParam("namespace") String namespace,
       @PathParam("table") String table,
       @DefaultValue("false") @QueryParam("purgeRequested") boolean 
purgeRequested) {
@@ -134,9 +143,9 @@ public class IcebergTableOperations {
     TableIdentifier tableIdentifier =
         TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
     if (purgeRequested) {
-      icebergTableOps.purgeTable(tableIdentifier);
+      icebergTableOpsManager.getOps(prefix).purgeTable(tableIdentifier);
     } else {
-      icebergTableOps.dropTable(tableIdentifier);
+      icebergTableOpsManager.getOps(prefix).dropTable(tableIdentifier);
     }
     return IcebergRestUtils.noContent();
   }
@@ -147,13 +156,14 @@ public class IcebergTableOperations {
   @Timed(name = "load-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
   @ResponseMetered(name = "load-table", absolute = true)
   public Response loadTable(
+      @PathParam("prefix") String prefix,
       @PathParam("namespace") String namespace,
       @PathParam("table") String table,
       @DefaultValue("all") @QueryParam("snapshots") String snapshots) {
     // todo support snapshots
     TableIdentifier tableIdentifier =
         TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
-    return IcebergRestUtils.ok(icebergTableOps.loadTable(tableIdentifier));
+    return 
IcebergRestUtils.ok(icebergTableOpsManager.getOps(prefix).loadTable(tableIdentifier));
   }
 
   @HEAD
@@ -162,10 +172,12 @@ public class IcebergTableOperations {
   @Timed(name = "table-exists." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "table-exits", absolute = true)
   public Response tableExists(
-      @PathParam("namespace") String namespace, @PathParam("table") String 
table) {
+      @PathParam("prefix") String prefix,
+      @PathParam("namespace") String namespace,
+      @PathParam("table") String table) {
     TableIdentifier tableIdentifier =
         TableIdentifier.of(RESTUtil.decodeNamespace(namespace), table);
-    if (icebergTableOps.tableExists(tableIdentifier)) {
+    if (icebergTableOpsManager.getOps(prefix).tableExists(tableIdentifier)) {
       return IcebergRestUtils.okWithoutContent();
     } else {
       return IcebergRestUtils.notExists();
@@ -178,6 +190,7 @@ public class IcebergTableOperations {
   @Timed(name = "report-table-metrics." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "report-table-metrics", absolute = true)
   public Response reportTableMetrics(
+      @PathParam("prefix") String prefix,
       @PathParam("namespace") String namespace,
       @PathParam("table") String table,
       ReportMetricsRequest request) {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
index 2b8585ba6..441ce0551 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableRenameOperations.java
@@ -25,11 +25,12 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
 import org.apache.gravitino.iceberg.service.IcebergRestUtils;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -43,19 +44,20 @@ public class IcebergTableRenameOperations {
   @Context
   private HttpServletRequest httpRequest;
 
-  private IcebergTableOps icebergTableOps;
+  private IcebergTableOpsManager icebergTableOpsManager;
 
   @Inject
-  public IcebergTableRenameOperations(IcebergTableOps icebergTableOps) {
-    this.icebergTableOps = icebergTableOps;
+  public IcebergTableRenameOperations(IcebergTableOpsManager 
icebergTableOpsManager) {
+    this.icebergTableOpsManager = icebergTableOpsManager;
   }
 
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @Timed(name = "rename-table." + MetricNames.HTTP_PROCESS_DURATION, absolute 
= true)
   @ResponseMetered(name = "rename-table", absolute = true)
-  public Response renameTable(RenameTableRequest renameTableRequest) {
-    icebergTableOps.renameTable(renameTableRequest);
+  public Response renameTable(
+      @PathParam("prefix") String prefix, RenameTableRequest 
renameTableRequest) {
+    icebergTableOpsManager.getOps(prefix).renameTable(renameTableRequest);
     return IcebergRestUtils.okWithoutContent();
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
index 6e2aa5c28..c70888d7b 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/util/IcebergRESTServerManager.java
@@ -123,7 +123,6 @@ public abstract class IcebergRESTServerManager {
         String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
 
     configMap.putAll(customConfigs);
-
     ITUtils.rewriteConfigFile(configTempFileName, configFileName, configMap);
   }
 
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
new file mode 100644
index 000000000..6a9375615
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergTableOpsProviderForTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.rest;
+
+import 
org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+
+public class ConfigBasedIcebergTableOpsProviderForTest extends 
ConfigBasedIcebergTableOpsProvider {
+  @Override
+  public IcebergTableOps getIcebergTableOps(String prefix) {
+    return new IcebergTableOpsForTest();
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 0e3d29e49..e7b19e0af 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -19,10 +19,13 @@
 
 package org.apache.gravitino.iceberg.service.rest;
 
+import com.google.common.collect.Maps;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
+import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
 import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
 import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
 import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
@@ -66,13 +69,19 @@ public class IcebergRestTestUtil {
     }
 
     if (bindIcebergTableOps) {
-      IcebergTableOps icebergTableOps = new IcebergTableOpsForTest();
+      Map<String, String> catalogConf = Maps.newHashMap();
+      catalogConf.put(String.format("catalog.%s.catalog-backend-name", 
PREFIX), PREFIX);
+      catalogConf.put(
+          IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
+          ConfigBasedIcebergTableOpsProviderForTest.class.getName());
+      IcebergTableOpsManager icebergTableOpsManager = new 
IcebergTableOpsManager(catalogConf);
+
       IcebergMetricsManager icebergMetricsManager = new 
IcebergMetricsManager(new IcebergConfig());
       resourceConfig.register(
           new AbstractBinder() {
             @Override
             protected void configure() {
-              bind(icebergTableOps).to(IcebergTableOps.class).ranked(2);
+              
bind(icebergTableOpsManager).to(IcebergTableOpsManager.class).ranked(2);
               
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2);
             }
           });
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
index ee672bf09..7d1d80b54 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergTestBase.java
@@ -27,6 +27,7 @@ import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.JerseyTest;
@@ -38,7 +39,7 @@ public class IcebergTestBase extends JerseyTest {
     return new ResourceConfig();
   }
 
-  private boolean urlPathWithPrefix = false;
+  private String urlPathPrefix = "";
 
   public Invocation.Builder getRenameTableClientBuilder() {
     return getIcebergClientBuilder(IcebergRestTestUtil.RENAME_TABLE_PATH, 
Optional.empty());
@@ -109,8 +110,8 @@ public class IcebergTestBase extends JerseyTest {
 
   public Invocation.Builder getIcebergClientBuilder(
       String path, Optional<Map<String, String>> queryParam) {
-    if (urlPathWithPrefix) {
-      path = injectPrefixToPath(path, IcebergRestTestUtil.PREFIX);
+    if (!StringUtils.isBlank(urlPathPrefix)) {
+      path = injectPrefixToPath(path, urlPathPrefix);
     }
     WebTarget target = target(path);
     if (queryParam.isPresent()) {
@@ -126,7 +127,7 @@ public class IcebergTestBase extends JerseyTest {
         .accept(MediaType.APPLICATION_JSON_TYPE);
   }
 
-  public void setUrlPathWithPrefix(boolean urlPathWithPrefix) {
-    this.urlPathWithPrefix = urlPathWithPrefix;
+  public void setUrlPathWithPrefix(String urlPathPrefix) {
+    this.urlPathPrefix = urlPathPrefix;
   }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
index b2f9e09ef..1116fc0bb 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
@@ -32,13 +32,13 @@ public class TestIcebergConfig extends IcebergTestBase {
 
   @Override
   protected Application configure() {
-    return 
IcebergRestTestUtil.getIcebergResourceConfig(IcebergConfigOperations.class, 
false);
+    return 
IcebergRestTestUtil.getIcebergResourceConfig(IcebergConfigOperations.class);
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testConfig(boolean withPrefix) {
-    setUrlPathWithPrefix(withPrefix);
+  @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+  public void testConfig(String prefix) {
+    setUrlPathWithPrefix(prefix);
     Response resp = getConfigClientBuilder().get();
     Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
     Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
index 251f9e907..6049a153e 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergNamespaceOperations.java
@@ -209,9 +209,9 @@ public class TestIcebergNamespaceOperations extends 
IcebergTestBase {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testListNamespace(boolean withPrefix) {
-    setUrlPathWithPrefix(withPrefix);
+  @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+  void testListNamespace(String prefix) {
+    setUrlPathWithPrefix(prefix);
     dropAllExistingNamespace();
     verifyListNamespaceSucc(Optional.empty(), Arrays.asList());
 
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
index e2d49d985..6037302b8 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
@@ -260,9 +260,9 @@ public class TestIcebergTableOperations extends 
TestIcebergNamespaceOperations {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testListTables(boolean withPrefix) {
-    setUrlPathWithPrefix(withPrefix);
+  @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+  void testListTables(String prefix) {
+    setUrlPathWithPrefix(prefix);
     verifyListTableFail(404);
 
     verifyCreateNamespaceSucc(IcebergRestTestUtil.TEST_NAMESPACE_NAME);
@@ -283,9 +283,9 @@ public class TestIcebergTableOperations extends 
TestIcebergNamespaceOperations {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  void testRenameTable(boolean withPrefix) {
-    setUrlPathWithPrefix(withPrefix);
+  @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+  void testRenameTable(String prefix) {
+    setUrlPathWithPrefix(prefix);
     // namespace not exits
     verifyRenameTableFail("rename_foo1", "rename_foo3", 404);
 

Reply via email to