This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d35ab31b6 [4958] feat(iceberg): support event listener for Iceberg
REST server (#5002)
d35ab31b6 is described below
commit d35ab31b661aa578dce612224212fff5f31619d5
Author: FANNG <[email protected]>
AuthorDate: Mon Oct 21 22:15:15 2024 +0800
[4958] feat(iceberg): support event listener for Iceberg REST server (#5002)
### What changes were proposed in this pull request?
1. Integrate Event listener system to Iceberg REST server
2. only dispatch create table event like `CreateIcebergTableEvent`
`CreateIcebergTablePreEvent` `CreateIcebergTableFailureEvent`
### Why are the changes needed?
Fix: #4958
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
1. add configuration to add `IcebergEventLogger` in event listener
```
gravitino.eventListener.names = iceberg
gravitino.eventListener.iceberg.class =
org.apache.gravitino.iceberg.extension.IcebergEventLogger
```
2. run query will see event logs.
---------
Co-authored-by: Qi Yu <[email protected]>
---
bundles/build.gradle.kts | 2 +-
.../lakehouse/iceberg/IcebergConstants.java | 3 +-
.../java/org/apache/gravitino/GravitinoEnv.java | 35 +++++++---
.../gravitino/iceberg/common/IcebergConfig.java | 16 +++--
.../common/ops/IcebergCatalogConfigProvider.java | 42 -----------
.../org/apache/gravitino/iceberg/RESTService.java | 27 +++++++-
.../iceberg/server/GravitinoIcebergRESTServer.java | 3 +-
.../service/IcebergCatalogWrapperManager.java | 78 ++++-----------------
.../iceberg/service/IcebergExceptionMapper.java | 3 +
.../iceberg/service/IcebergRestUtils.java | 55 +++++++++++++++
.../dispatcher/IcebergTableEventDispatcher.java | 81 ++++++++++++++++++++++
.../IcebergTableOperationDispatcher.java | 41 +++++++++++
.../dispatcher/IcebergTableOperationExecutor.java} | 25 ++++---
.../provider/DynamicIcebergConfigProvider.java} | 50 +++++++------
.../service/provider/IcebergConfigProvider.java | 57 +++++++++++++++
.../provider/IcebergConfigProviderFactory.java | 52 ++++++++++++++
.../provider/StaticIcebergConfigProvider.java} | 41 +++++------
.../service/rest/IcebergTableOperations.java | 21 ++++--
.../api/event/IcebergCreateTableEvent.java | 54 +++++++++++++++
.../api/event/IcebergCreateTableFailureEvent.java} | 22 +++---
.../api/event/IcebergCreateTablePreEvent.java} | 27 ++++----
.../listener/api/event/IcebergEvent.java} | 22 +++---
.../listener/api/event/IcebergFailureEvent.java} | 22 +++---
.../listener/api/event/IcebergPreEvent.java} | 22 +++---
.../listener/api/event/IcebergTableEvent.java} | 20 ++----
.../api/event/IcebergTableFailureEvent.java} | 22 +++---
.../listener/api/event/IcebergTablePreEvent.java} | 22 +++---
.../service/TestIcebergCatalogWrapperManager.java | 13 ++--
.../iceberg/service/TestIcebergRESTUtils.java | 77 ++++++++++++++++++++
.../TestDynamicIcebergConfigProvider.java} | 38 +++++-----
.../provider/TestStaticIcebergConfigProvider.java} | 38 +++++-----
.../rest/IcebergCatalogWrapperManagerForTest.java | 6 +-
.../iceberg/service/rest/IcebergRestTestUtil.java | 24 ++++++-
.../apache/gravitino/server/GravitinoServer.java | 2 +-
34 files changed, 716 insertions(+), 347 deletions(-)
diff --git a/bundles/build.gradle.kts b/bundles/build.gradle.kts
index 043fbfec6..fa6eb7d5e 100644
--- a/bundles/build.gradle.kts
+++ b/bundles/build.gradle.kts
@@ -19,4 +19,4 @@
tasks.all {
enabled = false
-}
\ No newline at end of file
+}
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 004bde0bd..52e665579 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -77,5 +77,6 @@ public class IcebergConstants {
public static final String GRAVITINO_METALAKE = "gravitino-metalake";
- public static final String GRAVITINO_DEFAULT_CATALOG =
"__gravitino_default_catalog";
+ public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino";
+ public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog";
}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 516e9d9d3..859b74674 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -79,6 +79,8 @@ public class GravitinoEnv {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoEnv.class);
private Config config;
+ // Iceberg REST server use base components while Gravitino Server use full
components.
+ private boolean manageFullComponents = true;
private EntityStore entityStore;
@@ -133,21 +135,30 @@ public class GravitinoEnv {
}
/**
- * Initialize the Gravitino environment.
+ * Initialize base components, used for Iceberg REST server.
*
* @param config The configuration object to initialize the environment.
- * @param isGravitinoServer A boolean flag indicating whether the
initialization is for the
- * Gravitino server. If true, server-specific components will be
initialized in addition to
- * the base components.
*/
- public void initialize(Config config, boolean isGravitinoServer) {
- LOG.info("Initializing Gravitino Environment...");
+ public void initializeBaseComponents(Config config) {
+ LOG.info("Initializing Gravitino base environment...");
this.config = config;
+ this.manageFullComponents = false;
initBaseComponents();
- if (isGravitinoServer) {
- initGravitinoServerComponents();
- }
- LOG.info("Gravitino Environment is initialized.");
+ LOG.info("Gravitino base environment is initialized.");
+ }
+
+ /**
+ * Initialize all components, used for Gravitino server.
+ *
+ * @param config The configuration object to initialize the environment.
+ */
+ public void initializeFullComponents(Config config) {
+ LOG.info("Initializing Gravitino full environment...");
+ this.config = config;
+ this.manageFullComponents = true;
+ initBaseComponents();
+ initGravitinoServerComponents();
+ LOG.info("Gravitino full environment is initialized.");
}
/**
@@ -311,9 +322,11 @@ public class GravitinoEnv {
}
public void start() {
- auxServiceManager.serviceStart();
metricsSystem.start();
eventListenerManager.start();
+ if (manageFullComponents) {
+ auxServiceManager.serviceStart();
+ }
}
/** Shutdown the Gravitino environment. */
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index 638b4172c..2e7eb74e2 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -54,7 +54,8 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
public static final ConfigEntry<String> CATALOG_BACKEND_IMPL =
new ConfigBuilder(IcebergConstants.CATALOG_BACKEND_IMPL)
.doc(
- "The fully-qualified class name of a custom catalog
implementation, only worked if `catalog-backend` is `custom`")
+ "The fully-qualified class name of a custom catalog
implementation, "
+ + "only worked if `catalog-backend` is `custom`")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
@@ -175,7 +176,8 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
public static final ConfigEntry<Integer> ICEBERG_METRICS_STORE_RETAIN_DAYS =
new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE_RETAIN_DAYS)
.doc(
- "The retain days of Iceberg metrics, the value not greater than
0 means retain forever")
+ "The retain days of Iceberg metrics, the value not greater than
0 means "
+ + "retain forever")
.version(ConfigConstants.VERSION_0_4_0)
.intConf()
.createWithDefault(-1);
@@ -205,7 +207,9 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER
=
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER)
.doc(
- "Catalog provider class name, you can develop a class that
implements `IcebergCatalogConfigProvider` and add the corresponding jar file to
the Iceberg REST service classpath directory.")
+ "Catalog provider class name, you can develop a class that
implements "
+ + "`IcebergConfigProvider` and add the corresponding jar
file to the Iceberg "
+ + "REST service classpath directory.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);
@@ -213,7 +217,8 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
.doc(
- "The uri of Gravitino server address, only worked if
`catalog-provider` is `gravitino-based-provider`.")
+ "The uri of Gravitino server address, only worked if
`catalog-provider` is "
+ + "`gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
@@ -221,7 +226,8 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
public static final ConfigEntry<String> GRAVITINO_METALAKE =
new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE)
.doc(
- "The metalake name that `gravitino-based-provider` used to
request to Gravitino, only worked if `catalog-provider` is
`gravitino-based-provider`.")
+ "The metalake name that `gravitino-based-provider` used to
request to Gravitino, "
+ + "only worked if `catalog-provider` is
`gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java
deleted file mode 100644
index fc0d488a1..000000000
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.iceberg.common.ops;
-
-import java.util.Map;
-import java.util.Optional;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-
-/**
- * {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg
REST catalog server
- * gets Iceberg catalog configurations.
- */
-public interface IcebergCatalogConfigProvider {
-
- /**
- * @param properties The parameters for creating Provider which from
configurations whose prefix
- * is 'gravitino.iceberg-rest.'
- */
- void initialize(Map<String, String> properties);
-
- /**
- * @param catalogName Iceberg catalog name.
- * @return the configuration of Iceberg catalog.
- */
- Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName);
-}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 0592cfd94..b301204bd 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -28,7 +28,13 @@ import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import
org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
+import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
@@ -54,6 +60,7 @@ public class RESTService implements GravitinoAuxiliaryService
{
private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
private IcebergMetricsManager icebergMetricsManager;
+ private IcebergConfigProvider configProvider;
private void initServer(IcebergConfig icebergConfig) {
JettyServerConfig serverConfig =
JettyServerConfig.fromConfig(icebergConfig);
@@ -70,14 +77,27 @@ public class RESTService implements
GravitinoAuxiliaryService {
new
HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config,
server);
metricsSystem.register(httpServerMetricsSource);
- icebergCatalogWrapperManager = new
IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
- icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
+ Map<String, String> configProperties = icebergConfig.getAllConfig();
+ this.configProvider =
IcebergConfigProviderFactory.create(configProperties);
+ configProvider.initialize(configProperties);
+ String metalakeName = configProvider.getMetalakeName();
+
+ EventBus eventBus = GravitinoEnv.getInstance().eventBus();
+ this.icebergCatalogWrapperManager =
+ new IcebergCatalogWrapperManager(configProperties, configProvider);
+ this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
+ IcebergTableOperationExecutor icebergTableOperationExecutor =
+ new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
+ IcebergTableEventDispatcher icebergTableEventDispatcher =
+ new IcebergTableEventDispatcher(icebergTableOperationExecutor,
eventBus, metalakeName);
+
config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
+
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});
@@ -118,6 +138,9 @@ public class RESTService implements
GravitinoAuxiliaryService {
server.stop();
LOG.info("Iceberg REST service stopped");
}
+ if (configProvider != null) {
+ configProvider.close();
+ }
if (icebergCatalogWrapperManager != null) {
icebergCatalogWrapperManager.close();
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
index 622f0d21a..35fe05ce3 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
@@ -45,13 +45,14 @@ public class GravitinoIcebergRESTServer {
}
private void initialize() {
- gravitinoEnv.initialize(serverConfig, false);
+ gravitinoEnv.initializeBaseComponents(serverConfig);
icebergRESTService.serviceInit(
serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX));
ServerAuthenticator.getInstance().initialize(serverConfig);
}
private void start() {
+ gravitinoEnv.start();
icebergRESTService.serviceStart();
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index 823f42ddb..cefc62bc2 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -22,46 +22,34 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.CredentialProvider;
import org.apache.gravitino.credential.CredentialProviderFactory;
import org.apache.gravitino.credential.CredentialProviderManager;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import
org.apache.gravitino.iceberg.provider.DynamicIcebergCatalogConfigProvider;
-import
org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergCatalogWrapperManager implements AutoCloseable {
public static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogWrapperManager.class);
- private static final ImmutableMap<String, String>
ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES =
- ImmutableMap.of(
- IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
- StaticIcebergCatalogConfigProvider.class.getCanonicalName(),
- IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
- DynamicIcebergCatalogConfigProvider.class.getCanonicalName());
-
private final Cache<String, IcebergCatalogWrapper>
icebergCatalogWrapperCache;
- private final IcebergCatalogConfigProvider provider;
+ private final IcebergConfigProvider configProvider;
private CredentialProviderManager credentialProviderManager;
- public IcebergCatalogWrapperManager(Map<String, String> properties) {
+ public IcebergCatalogWrapperManager(
+ Map<String, String> properties, IcebergConfigProvider configProvider) {
this.credentialProviderManager = new CredentialProviderManager();
- this.provider = createIcebergCatalogConfigProvider(properties);
- this.provider.initialize(properties);
+ this.configProvider = configProvider;
this.icebergCatalogWrapperCache =
Caffeine.newBuilder()
.expireAfterWrite(
@@ -92,17 +80,21 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
* @return the instance of IcebergCatalogWrapper.
*/
public IcebergCatalogWrapper getOps(String rawPrefix) {
- String catalogName = getCatalogName(rawPrefix);
- IcebergCatalogWrapper tableOps =
+ String catalogName = IcebergRestUtils.getCatalogName(rawPrefix);
+ return getCatalogWrapper(catalogName);
+ }
+
+ public IcebergCatalogWrapper getCatalogWrapper(String catalogName) {
+ IcebergCatalogWrapper catalogWrapper =
icebergCatalogWrapperCache.get(catalogName, k ->
createCatalogWrapper(catalogName));
// Reload conf to reset UserGroupInformation or icebergTableOps will
always use
// Simple auth.
- tableOps.reloadHadoopConf();
- return tableOps;
+ catalogWrapper.reloadHadoopConf();
+ return catalogWrapper;
}
public CredentialProvider getCredentialProvider(String prefix) {
- String catalogName = getCatalogName(prefix);
+ String catalogName = IcebergRestUtils.getCatalogName(prefix);
return credentialProviderManager.getCredentialProvider(catalogName);
}
@@ -112,7 +104,7 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
}
private IcebergCatalogWrapper createCatalogWrapper(String catalogName) {
- Optional<IcebergConfig> icebergConfig =
provider.getIcebergCatalogConfig(catalogName);
+ Optional<IcebergConfig> icebergConfig =
configProvider.getIcebergCatalogConfig(catalogName);
if (!icebergConfig.isPresent()) {
throw new RuntimeException("Couldn't find Iceberg configuration for " +
catalogName);
}
@@ -128,43 +120,6 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
return createIcebergCatalogWrapper(icebergConfig.get());
}
- private String getCatalogName(String rawPrefix) {
- String prefix = shelling(rawPrefix);
- Preconditions.checkArgument(
- !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
- String.format("%s is conflict with reserved key, please replace it",
prefix));
- if (StringUtils.isBlank(prefix)) {
- return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
- }
- return prefix;
- }
-
- private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider(
- Map<String, String> properties) {
- String providerName =
- (new
IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER);
- String className =
- ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName,
providerName);
- LOG.info("Load Iceberg catalog provider: {}.", className);
- try {
- Class<?> providerClz = Class.forName(className);
- return (IcebergCatalogConfigProvider)
providerClz.getDeclaredConstructor().newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private String shelling(String rawPrefix) {
- if (StringUtils.isBlank(rawPrefix)) {
- return rawPrefix;
- } else {
- // rawPrefix is a string matching ([^/]*/) which end with /
- Preconditions.checkArgument(
- rawPrefix.endsWith("/"), String.format("rawPrefix %s format is
illegal", rawPrefix));
- return rawPrefix.substring(0, rawPrefix.length() - 1);
- }
- }
-
private void closeIcebergCatalogWrapper(IcebergCatalogWrapper
catalogWrapper) {
try {
catalogWrapper.close();
@@ -176,8 +131,5 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
@Override
public void close() throws Exception {
icebergCatalogWrapperCache.invalidateAll();
- if (provider instanceof AutoCloseable) {
- ((AutoCloseable) provider).close();
- }
}
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
index f880f7f7a..ed7d0a2f9 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
@@ -24,6 +24,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
+import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements
ExceptionMapper<Exception> {
ImmutableMap.<Class<? extends Exception>, Integer>builder()
.put(IllegalArgumentException.class, 400)
.put(ValidationException.class, 400)
+ .put(IllegalNameIdentifierException.class, 400)
.put(NamespaceNotEmptyException.class, 400)
.put(NotAuthorizedException.class, 401)
+ .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403)
.put(ForbiddenException.class, 403)
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
index fb0e8005c..af017b2de 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
@@ -18,12 +18,21 @@
*/
package org.apache.gravitino.iceberg.service;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.stream.Stream;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.responses.ErrorResponse;
public class IcebergRestUtils {
@@ -71,4 +80,50 @@ public class IcebergRestUtils {
}
return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant();
}
+
+ public static NameIdentifier getGravitinoNameIdentifier(
+ String metalakeName, String catalogName, TableIdentifier
icebergIdentifier) {
+ Stream<String> catalogNS =
+ Stream.concat(
+ Stream.of(metalakeName, catalogName),
+ Arrays.stream(icebergIdentifier.namespace().levels()));
+ String[] catalogNSTable =
+ Stream.concat(catalogNS,
Stream.of(icebergIdentifier.name())).toArray(String[]::new);
+ return NameIdentifier.of(catalogNSTable);
+ }
+
+ public static String getCatalogName(String rawPrefix) {
+ String catalogName = normalizePrefix(rawPrefix);
+ Preconditions.checkArgument(
+ !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName),
+ String.format(
+ "%s is conflicted with reserved catalog name, please replace it",
catalogName));
+ if (StringUtils.isBlank(catalogName)) {
+ return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG;
+ }
+ return catalogName;
+ }
+
+ public static <T> T cloneIcebergRESTObject(Object message, Class<T>
className) {
+ ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance();
+ try {
+ byte[] values = icebergObjectMapper.writeValueAsBytes(message);
+ return icebergObjectMapper.readValue(values, className);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // remove the last '/' from the prefix, for example transform
'iceberg_catalog/' to
+ // 'iceberg_catalog'
+ private static String normalizePrefix(String rawPrefix) {
+ if (StringUtils.isBlank(rawPrefix)) {
+ return rawPrefix;
+ } else {
+ // rawPrefix is a string matching ([^/]*/) which end with /
+ Preconditions.checkArgument(
+ rawPrefix.endsWith("/"), String.format("rawPrefix %s format is
illegal", rawPrefix));
+ return rawPrefix.substring(0, rawPrefix.length() - 1);
+ }
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
new file mode 100644
index 000000000..315ba6203
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.iceberg.service.IcebergRestUtils;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.api.event.IcebergCreateTableEvent;
+import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/**
+ * {@code IcebergTableEventDispatcher} is a decorator for {@link
IcebergTableOperationExecutor} that
+ * not only delegates table operations to the underlying dispatcher but also
dispatches
+ * corresponding events to an {@link org.apache.gravitino.listener.EventBus}.
+ */
+public class IcebergTableEventDispatcher implements
IcebergTableOperationDispatcher {
+
+ private IcebergTableOperationDispatcher icebergTableOperationDispatcher;
+ private EventBus eventBus;
+ private String metalakeName;
+
+ public IcebergTableEventDispatcher(
+ IcebergTableOperationDispatcher icebergTableOperationDispatcher,
+ EventBus eventBus,
+ String metalakeName) {
+ this.icebergTableOperationDispatcher = icebergTableOperationDispatcher;
+ this.eventBus = eventBus;
+ this.metalakeName = metalakeName;
+ }
+
+ @Override
+ public LoadTableResponse createTable(
+ String catalogName, Namespace namespace, CreateTableRequest
createTableRequest) {
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace,
createTableRequest.name());
+ NameIdentifier nameIdentifier =
+ IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName,
tableIdentifier);
+ eventBus.dispatchEvent(
+ new IcebergCreateTablePreEvent(
+ PrincipalUtils.getCurrentUserName(), nameIdentifier,
createTableRequest));
+ LoadTableResponse loadTableResponse;
+ try {
+ loadTableResponse =
+ icebergTableOperationDispatcher.createTable(catalogName, namespace,
createTableRequest);
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new IcebergCreateTableFailureEvent(
+ PrincipalUtils.getCurrentUserName(), nameIdentifier, e));
+ throw e;
+ }
+ eventBus.dispatchEvent(
+ new IcebergCreateTableEvent(
+ PrincipalUtils.getCurrentUserName(),
+ nameIdentifier,
+ createTableRequest,
+ loadTableResponse));
+ return loadTableResponse;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
new file mode 100644
index 000000000..948e48662
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/**
+ * The {@code IcebergTableOperationDispatcher} interface defines the public
API for managing Iceberg
+ * tables.
+ */
+public interface IcebergTableOperationDispatcher {
+ /**
+ * Creates a new Iceberg table.
+ *
+ * @param catalogName The catalog name when creating the table.
+ * @param namespace The namespace within which the table should be created.
+ * @param createTableRequest The request object containing the details for
creating the table.
+ * @return A {@link LoadTableResponse} object containing the result of the
operation.
+ */
+ LoadTableResponse createTable(
+ String catalogName, Namespace namespace, CreateTableRequest
createTableRequest);
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
similarity index 53%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
index 7d359926a..9a51d7b7a 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
@@ -17,21 +17,26 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.iceberg.service.dispatcher;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
+public class IcebergTableOperationExecutor implements
IcebergTableOperationDispatcher {
+
+ private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
+
+ public IcebergTableOperationExecutor(IcebergCatalogWrapperManager
icebergCatalogWrapperManager) {
+ this.icebergCatalogWrapperManager = icebergCatalogWrapperManager;
}
@Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+ public LoadTableResponse createTable(
+ String catalogName, Namespace namespace, CreateTableRequest
createTableRequest) {
+ return icebergCatalogWrapperManager
+ .getCatalogWrapper(catalogName)
+ .createTable(namespace, createTableRequest);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
similarity index 62%
rename from
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java
rename to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
index 4965f4bc1..0f35fae52 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java
@@ -1,22 +1,22 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.gravitino.iceberg.provider;
+package org.apache.gravitino.iceberg.service.provider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -29,9 +29,6 @@ import
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This provider proxy Gravitino lakehouse-iceberg catalogs.
@@ -40,11 +37,7 @@ import org.slf4j.LoggerFactory;
*
* <p>The catalogName is iceberg_catalog
*/
-public class DynamicIcebergCatalogConfigProvider
- implements IcebergCatalogConfigProvider, AutoCloseable {
- public static final Logger LOG =
- LoggerFactory.getLogger(DynamicIcebergCatalogConfigProvider.class);
-
+public class DynamicIcebergConfigProvider implements IcebergConfigProvider {
private String gravitinoMetalake;
private GravitinoAdminClient client;
@@ -68,8 +61,8 @@ public class DynamicIcebergCatalogConfigProvider
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
- !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName),
- IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in
gravitino-based-provider");
+ !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName),
+ IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + " is illegal in
gravitino-based-provider");
Catalog catalog;
try {
@@ -93,9 +86,14 @@ public class DynamicIcebergCatalogConfigProvider
}
@Override
- public void close() throws Exception {
+ public void close() {
if (client != null) {
client.close();
}
}
+
+ @Override
+ public String getMetalakeName() {
+ return gravitinoMetalake;
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java
new file mode 100644
index 000000000..55d9793fb
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.provider;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+
+/**
+ * {@code IcebergConfigProvider} is an interface defining how Iceberg REST
catalog server gets
+ * Iceberg catalog configurations.
+ */
+public interface IcebergConfigProvider extends Closeable {
+
+ /**
+ * Initialize {@code IcebergConfigProvider} with properties.
+ *
+ * @param properties The parameters for creating Provider which from
configurations whose prefix
+ * is 'gravitino.iceberg-rest.'
+ */
+ void initialize(Map<String, String> properties);
+
+ /**
+ * Get Iceberg configuration from catalog name.
+ *
+ * @param catalogName Iceberg catalog name.
+ * @return the configuration of Iceberg catalog.
+ */
+ Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName);
+
+ /**
+ * Get metalake name.
+ *
+ * @return the name of metalake.
+ */
+ default String getMetalakeName() {
+ return IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java
new file mode 100644
index 000000000..f21759031
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.provider;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergConfigProviderFactory {
+ public static final Logger LOG =
LoggerFactory.getLogger(IcebergConfigProviderFactory.class);
+
+ private static final ImmutableMap<String, String>
ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES =
+ ImmutableMap.of(
+ IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
+ StaticIcebergConfigProvider.class.getCanonicalName(),
+ IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
+ DynamicIcebergConfigProvider.class.getCanonicalName());
+
+ public static IcebergConfigProvider create(Map<String, String> properties) {
+ String providerName =
+ (new
IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER);
+ String className =
+ ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName,
providerName);
+ LOG.info("Load Iceberg catalog provider: {}.", className);
+ try {
+ Class<?> providerClz = Class.forName(className);
+ return (IcebergConfigProvider)
providerClz.getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java
similarity index 64%
rename from
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java
rename to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java
index aa7f10321..f8c4fb350 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java
@@ -1,22 +1,22 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.gravitino.iceberg.provider;
+package org.apache.gravitino.iceberg.service.provider;
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
@@ -24,7 +24,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.apache.gravitino.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +38,8 @@ import org.slf4j.LoggerFactory;
* gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive
* gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ...
*/
-public class StaticIcebergCatalogConfigProvider implements
IcebergCatalogConfigProvider {
- public static final Logger LOG =
- LoggerFactory.getLogger(StaticIcebergCatalogConfigProvider.class);
+public class StaticIcebergConfigProvider implements IcebergConfigProvider {
+ public static final Logger LOG =
LoggerFactory.getLogger(StaticIcebergConfigProvider.class);
@VisibleForTesting Map<String, IcebergConfig> catalogConfigs;
@@ -61,7 +59,7 @@ public class StaticIcebergCatalogConfigProvider implements
IcebergCatalogConfigP
MapUtils.getPrefixMap(
properties, String.format("catalog.%s.",
catalogName)))));
this.catalogConfigs.put(
- IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new
IcebergConfig(properties));
+ IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, new
IcebergConfig(properties));
}
@Override
@@ -69,6 +67,9 @@ public class StaticIcebergCatalogConfigProvider implements
IcebergCatalogConfigP
return Optional.ofNullable(catalogConfigs.get(catalogName));
}
+ @Override
+ public void close() {}
+
private Optional<String> getCatalogName(String catalogConfigKey) {
if (!catalogConfigKey.startsWith("catalog.")) {
return Optional.empty();
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 33023343e..cebb74884 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -50,8 +50,10 @@ import org.apache.gravitino.credential.CredentialUtils;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.metrics.MetricNames;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ServiceUnavailableException;
import org.apache.iceberg.rest.RESTUtil;
@@ -76,6 +78,7 @@ public class IcebergTableOperations {
private IcebergMetricsManager icebergMetricsManager;
private ObjectMapper icebergObjectMapper;
+ private IcebergTableOperationDispatcher tableOperationDispatcher;
@SuppressWarnings("UnusedVariable")
@Context
@@ -84,10 +87,12 @@ public class IcebergTableOperations {
@Inject
public IcebergTableOperations(
IcebergCatalogWrapperManager icebergCatalogWrapperManager,
- IcebergMetricsManager icebergMetricsManager) {
+ IcebergMetricsManager icebergMetricsManager,
+ IcebergTableOperationDispatcher tableOperationDispatcher) {
this.icebergCatalogWrapperManager = icebergCatalogWrapperManager;
- this.icebergObjectMapper = IcebergObjectMapper.getInstance();
this.icebergMetricsManager = icebergMetricsManager;
+ this.tableOperationDispatcher = tableOperationDispatcher;
+ this.icebergObjectMapper = IcebergObjectMapper.getInstance();
}
@GET
@@ -110,16 +115,18 @@ public class IcebergTableOperations {
CreateTableRequest createTableRequest,
@HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation) {
boolean isCredentialVending = isCredentialVending(accessDelegation);
+ String catalogName = IcebergRestUtils.getCatalogName(prefix);
+ Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
LOG.info(
- "Create Iceberg table, namespace: {}, create table request: {},
accessDelegation: {}, isCredentialVending: {}",
- namespace,
+ "Create Iceberg table, catalog: {}, namespace: {}, create table
request: {}, "
+ + "accessDelegation: {}, isCredentialVending: {}",
+ catalogName,
+ icebergNS,
createTableRequest,
accessDelegation,
isCredentialVending);
LoadTableResponse loadTableResponse =
- icebergCatalogWrapperManager
- .getOps(prefix)
- .createTable(RESTUtil.decodeNamespace(namespace),
createTableRequest);
+ tableOperationDispatcher.createTable(catalogName, icebergNS,
createTableRequest);
if (isCredentialVending) {
return IcebergRestUtils.ok(injectCredentialConfig(prefix,
loadTableResponse));
} else {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java
new file mode 100644
index 000000000..1ce2d8f77
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.iceberg.service.IcebergRestUtils;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/** Represent an event after creating Iceberg table successfully. */
+@DeveloperApi
+public class IcebergCreateTableEvent extends IcebergTableEvent {
+
+ private CreateTableRequest createTableRequest;
+ private LoadTableResponse loadTableResponse;
+
+ public IcebergCreateTableEvent(
+ String user,
+ NameIdentifier resourceIdentifier,
+ CreateTableRequest createTableRequest,
+ LoadTableResponse loadTableResponse) {
+ super(user, resourceIdentifier);
+ this.createTableRequest =
+ IcebergRestUtils.cloneIcebergRESTObject(createTableRequest,
CreateTableRequest.class);
+ this.loadTableResponse =
+ IcebergRestUtils.cloneIcebergRESTObject(loadTableResponse,
LoadTableResponse.class);
+ }
+
+ public CreateTableRequest createTableRequest() {
+ return createTableRequest;
+ }
+
+ public LoadTableResponse loadTableResponse() {
+ return loadTableResponse;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java
index 7d359926a..24f74da4f 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represent a failure event when creating Iceberg table failed. */
+@DeveloperApi
+public class IcebergCreateTableFailureEvent extends IcebergTableFailureEvent {
+ public IcebergCreateTableFailureEvent(String user, NameIdentifier
nameIdentifier, Exception e) {
+ super(user, nameIdentifier, e);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java
index 7d359926a..81937e501 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java
@@ -17,21 +17,24 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
+/** Represent a pre event before creating Iceberg table. */
+@DeveloperApi
+public class IcebergCreateTablePreEvent extends IcebergTablePreEvent {
+ private CreateTableRequest createTableRequest;
+
+ public IcebergCreateTablePreEvent(
+ String user, NameIdentifier resourceIdentifier, CreateTableRequest
createTableRequest) {
+ super(user, resourceIdentifier);
+ this.createTableRequest = createTableRequest;
}
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+ public CreateTableRequest createTableRequest() {
+ return createTableRequest;
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java
index 7d359926a..50ec07863 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represents an abstract post event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergEvent extends Event {
+ protected IcebergEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java
index 7d359926a..e166bfdc9 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represents an abstract failure event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergFailureEvent extends FailureEvent {
+ protected IcebergFailureEvent(String user, NameIdentifier nameIdentifier,
Exception e) {
+ super(user, nameIdentifier, e);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java
index 7d359926a..e57edde65 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represents an abstract pre event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergPreEvent extends PreEvent {
+ protected IcebergPreEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java
index 7d359926a..e94a1938c 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java
@@ -17,21 +17,13 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represents an abstract table post event in Gravitino Iceberg REST server.
*/
+public abstract class IcebergTableEvent extends IcebergEvent {
+ protected IcebergTableEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java
index 7d359926a..f052b0060 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represent a failure event when do Iceberg table operation failed. */
+@DeveloperApi
+public class IcebergTableFailureEvent extends IcebergFailureEvent {
+ protected IcebergTableFailureEvent(String user, NameIdentifier
nameIdentifier, Exception e) {
+ super(user, nameIdentifier, e);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java
similarity index 54%
copy from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
copy to
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java
index 7d359926a..486c6f094 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.gravitino.iceberg.service.rest;
+package org.apache.gravitino.listener.api.event;
-import java.util.Map;
-import org.apache.gravitino.iceberg.common.IcebergConfig;
-import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
-import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
-// Provide a custom catalogWrapper to do test like `registerTable`
-public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
- }
-
- @Override
- public IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig
icebergConfig) {
- return new IcebergCatalogWrapperForTest(icebergConfig);
+/** Represents an abstract table pre event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergTablePreEvent extends IcebergPreEvent {
+ protected IcebergTablePreEvent(String user, NameIdentifier
resourceIdentifier) {
+ super(user, resourceIdentifier);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java
index 328bafa62..85a7fdc04 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import
org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -39,7 +41,9 @@ public class TestIcebergCatalogWrapperManager {
}
Map<String, String> config = Maps.newHashMap();
config.put(String.format("catalog.%s.catalog-backend-name", prefix),
prefix);
- IcebergCatalogWrapperManager manager = new
IcebergCatalogWrapperManager(config);
+ IcebergConfigProvider configProvider =
IcebergConfigProviderFactory.create(config);
+ configProvider.initialize(config);
+ IcebergCatalogWrapperManager manager = new
IcebergCatalogWrapperManager(config, configProvider);
IcebergCatalogWrapper ops = manager.getOps(rawPrefix);
@@ -51,11 +55,12 @@ public class TestIcebergCatalogWrapperManager {
}
@ParameterizedTest
- @ValueSource(
- strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~",
"__gravitino_default_catalog/"})
+ @ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~",
"default_catalog/"})
public void testInvalidGetOps(String rawPrefix) {
Map<String, String> config = Maps.newHashMap();
- IcebergCatalogWrapperManager manager = new
IcebergCatalogWrapperManager(config);
+ IcebergConfigProvider configProvider =
IcebergConfigProviderFactory.create(config);
+ configProvider.initialize(config);
+ IcebergCatalogWrapperManager manager = new
IcebergCatalogWrapperManager(config, configProvider);
Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
manager.getOps(rawPrefix));
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
new file mode 100644
index 000000000..79a317e52
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergRESTUtils {
+
+ @Test
+ void testGetGravitinoNameIdentifier() {
+ String metalakeName = "metalake";
+ String catalogName = "catalog";
+ TableIdentifier tableIdentifier = TableIdentifier.of("ns1", "ns2",
"table");
+ NameIdentifier nameIdentifier =
+ IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName,
tableIdentifier);
+ Assertions.assertEquals(
+ NameIdentifier.of(metalakeName, catalogName, "ns1", "ns2", "table"),
nameIdentifier);
+ }
+
+ @Test
+ void testGetCatalogName() {
+ String prefix = "catalog/";
+ Assertions.assertEquals("catalog",
IcebergRestUtils.getCatalogName(prefix));
+ Assertions.assertEquals(
+ IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG,
IcebergRestUtils.getCatalogName(""));
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () ->
IcebergRestUtils.getCatalogName(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG +
"/"));
+ }
+
+ @Test
+ void testSerdeIcebergRESTObject() {
+ Schema tableSchema =
+ new Schema(
+ NestedField.of(1, false, "foo1", StringType.get()),
+ NestedField.of(2, true, "foo2", IntegerType.get()));
+ CreateTableRequest createTableRequest =
+
CreateTableRequest.builder().withName("table").withSchema(tableSchema).build();
+ CreateTableRequest clonedIcebergRESTObject =
+ IcebergRestUtils.cloneIcebergRESTObject(createTableRequest,
CreateTableRequest.class);
+ Assertions.assertEquals(createTableRequest.name(),
clonedIcebergRESTObject.name());
+ Assertions.assertEquals(
+ createTableRequest.schema().columns().size(),
+ clonedIcebergRESTObject.schema().columns().size());
+ for (int i = 0; i < createTableRequest.schema().columns().size(); i++) {
+ NestedField field = createTableRequest.schema().columns().get(i);
+ NestedField clonedField =
clonedIcebergRESTObject.schema().columns().get(i);
+ Assertions.assertEquals(field, clonedField);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
similarity index 78%
rename from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java
rename to
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
index f9ffbb427..4eb5da5af 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java
@@ -1,22 +1,22 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.gravitino.iceberg.provider;
+package org.apache.gravitino.iceberg.service.provider;
import java.util.HashMap;
import org.apache.gravitino.Catalog;
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-public class TestDynamicIcebergCatalogWrapperProvider {
+public class TestDynamicIcebergConfigProvider {
@Test
public void testValidIcebergTableOps() {
String hiveCatalogName = "hive_backend";
@@ -71,7 +71,7 @@ public class TestDynamicIcebergCatalogWrapperProvider {
}
});
- DynamicIcebergCatalogConfigProvider provider = new
DynamicIcebergCatalogConfigProvider();
+ DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider();
GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class);
Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake);
provider.setClient(client);
@@ -102,7 +102,7 @@ public class TestDynamicIcebergCatalogWrapperProvider {
GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class);
Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake);
- DynamicIcebergCatalogConfigProvider provider = new
DynamicIcebergCatalogConfigProvider();
+ DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider();
provider.setClient(client);
Assertions.assertThrowsExactly(
@@ -111,6 +111,6 @@ public class TestDynamicIcebergCatalogWrapperProvider {
IllegalArgumentException.class, () ->
provider.getIcebergCatalogConfig(""));
Assertions.assertThrowsExactly(
IllegalArgumentException.class,
- () ->
provider.getIcebergCatalogConfig(IcebergConstants.GRAVITINO_DEFAULT_CATALOG));
+ () ->
provider.getIcebergCatalogConfig(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG));
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java
similarity index 79%
rename from
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java
rename to
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java
index 69f5b5ad2..3a4766016 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java
@@ -1,22 +1,22 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.gravitino.iceberg.provider;
+package org.apache.gravitino.iceberg.service.provider;
import com.google.common.collect.Maps;
import java.util.Map;
@@ -32,13 +32,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-public class TestStaticIcebergCatalogWrapperProvider {
+public class TestStaticIcebergConfigProvider {
@Test
public void testValidIcebergTableOps() {
String hiveCatalogName = "hive_backend";
String jdbcCatalogName = "jdbc_backend";
- String defaultCatalogName = IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+ String defaultCatalogName = IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG;
Map<String, String> config = Maps.newHashMap();
// hive backend catalog
@@ -60,7 +60,7 @@ public class TestStaticIcebergCatalogWrapperProvider {
config.put("catalog-backend", "memory");
config.put("warehouse", "/tmp/");
- StaticIcebergCatalogConfigProvider provider = new
StaticIcebergCatalogConfigProvider();
+ StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider();
provider.initialize(config);
IcebergConfig hiveIcebergConfig =
provider.catalogConfigs.get(hiveCatalogName);
@@ -106,7 +106,7 @@ public class TestStaticIcebergCatalogWrapperProvider {
@ParameterizedTest
@ValueSource(strings = {"", "not_match"})
public void testInvalidIcebergTableOps(String catalogName) {
- StaticIcebergCatalogConfigProvider provider = new
StaticIcebergCatalogConfigProvider();
+ StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider();
provider.initialize(Maps.newHashMap());
Optional<IcebergConfig> config =
provider.getIcebergCatalogConfig(catalogName);
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
index 7d359926a..361b086d9 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java
@@ -23,11 +23,13 @@ import java.util.Map;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
// Provide a custom catalogWrapper to do test like `registerTable`
public class IcebergCatalogWrapperManagerForTest extends
IcebergCatalogWrapperManager {
- public IcebergCatalogWrapperManagerForTest(Map<String, String> properties) {
- super(properties);
+ public IcebergCatalogWrapperManagerForTest(
+ Map<String, String> properties, IcebergConfigProvider configProvider) {
+ super(properties, configProvider);
}
@Override
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 1a085a251..1314a3bac 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -20,18 +20,25 @@
package org.apache.gravitino.iceberg.service.rest;
import com.google.common.collect.Maps;
+import java.util.Arrays;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
-import
org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
+import
org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor;
import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
+import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
+import
org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
+import
org.apache.gravitino.iceberg.service.provider.StaticIcebergConfigProvider;
+import org.apache.gravitino.listener.EventBus;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.logging.LoggingFeature;
@@ -80,14 +87,24 @@ public class IcebergRestTestUtil {
String catalogConfigPrefix = "catalog." + PREFIX;
catalogConf.put(
IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER,
- StaticIcebergCatalogConfigProvider.class.getName());
+ StaticIcebergConfigProvider.class.getName());
catalogConf.put(String.format("%s.catalog-backend-name",
catalogConfigPrefix), PREFIX);
catalogConf.put(
CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE);
+ IcebergConfigProvider configProvider =
IcebergConfigProviderFactory.create(catalogConf);
+ configProvider.initialize(catalogConf);
// used to override register table interface
IcebergCatalogWrapperManager icebergCatalogWrapperManager =
- new IcebergCatalogWrapperManagerForTest(catalogConf);
+ new IcebergCatalogWrapperManagerForTest(catalogConf, configProvider);
+
+ EventBus eventBus = new EventBus(Arrays.asList());
+
+ IcebergTableOperationExecutor icebergTableOperationExecutor =
+ new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
+ IcebergTableEventDispatcher icebergTableEventDispatcher =
+ new IcebergTableEventDispatcher(
+ icebergTableOperationExecutor, eventBus,
configProvider.getMetalakeName());
IcebergMetricsManager icebergMetricsManager = new
IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
@@ -96,6 +113,7 @@ public class IcebergRestTestUtil {
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(2);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2);
+
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(2);
}
});
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index e383c65b7..36c112f00 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -80,7 +80,7 @@ public class GravitinoServer extends ResourceConfig {
}
public void initialize() {
- gravitinoEnv.initialize(serverConfig, true);
+ gravitinoEnv.initializeFullComponents(serverConfig);
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX);