This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 417fed6c9 [AMORO-3393]: A stateless catalog manager implement (#3394)
417fed6c9 is described below
commit 417fed6c9b04e59501036d9dd225f64249c3ab60
Author: baiyangtx <[email protected]>
AuthorDate: Fri Jan 10 15:58:06 2025 +0800
[AMORO-3393]: A stateless catalog manager implement (#3394)
* stateless catalog manager
* stateless catalog manager
* ci
* ci
* improve db access
* add catalog meta cache
* add catalog meta cache
* ut
* ut
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 6 +
.../apache/amoro/server/AmoroServiceContainer.java | 22 ++-
.../amoro/server/DefaultOptimizingService.java | 10 +-
.../apache/amoro/server/RestCatalogService.java | 7 +-
.../amoro/server/TableManagementService.java | 21 +-
.../{CatalogService.java => CatalogManager.java} | 11 +-
.../server/catalog/DefaultCatalogManager.java | 218 +++++++++++++++++++++
.../amoro/server/catalog/ExternalCatalog.java | 28 +--
.../amoro/server/catalog/InternalCatalog.java | 5 +
.../apache/amoro/server/catalog/ServerCatalog.java | 41 +++-
.../amoro/server/dashboard/DashboardServer.java | 10 +-
.../server/dashboard/ServerTableDescriptor.java | 8 +-
.../dashboard/controller/CatalogController.java | 28 +--
.../dashboard/controller/TableController.java | 18 +-
.../persistence/mapper/CatalogMetaMapper.java | 5 +-
.../amoro/server/table/DefaultTableService.java | 136 ++-----------
.../apache/amoro/server/table/TableService.java | 3 +-
.../amoro/server/terminal/TerminalManager.java | 8 +-
.../org/apache/amoro/server/AmsEnvironment.java | 15 +-
.../amoro/server/RestCatalogServiceTestBase.java | 7 +-
.../server/TestInternalIcebergCatalogService.java | 4 +-
.../amoro/server/catalog/TableCatalogTestBase.java | 4 +-
.../amoro/server/catalog/TestServerCatalog.java | 10 +-
.../optimizing/TestIcebergHadoopOptimizing.java | 2 +-
.../amoro/server/table/AMSTableTestBase.java | 8 +-
.../amoro/server/table/TableServiceTestBase.java | 8 +-
.../amoro/server/table/TestCatalogService.java | 38 ++--
.../amoro/server/table/TestDatabaseService.java | 4 +-
.../server/table/TestTableRuntimeHandler.java | 4 +-
.../amoro/server/table/TestTableService.java | 4 +-
dist/src/main/amoro-bin/conf/config.yaml | 3 +
31 files changed, 450 insertions(+), 246 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index c76133774..dfbb0099d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -51,6 +51,12 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");
+ public static final ConfigOption<Duration>
CATALOG_META_CACHE_EXPIRATION_INTERVAL =
+ ConfigOptions.key("catalog-meta-cache.expiration-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription("TTL for catalog metadata.");
+
public static final ConfigOption<Integer> TABLE_MANIFEST_IO_THREAD_COUNT =
ConfigOptions.key("table-manifest-io.thread-count")
.intType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 5c5ee9c46..2daa17199 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -28,6 +28,8 @@ import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.AmoroRuntimeException;
+import org.apache.amoro.server.catalog.CatalogManager;
+import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
@@ -92,6 +94,7 @@ public class AmoroServiceContainer {
private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
+ private CatalogManager catalogManager;
private DefaultTableService tableService;
private DefaultOptimizingService optimizingService;
private TerminalManager terminalManager;
@@ -146,8 +149,9 @@ public class AmoroServiceContainer {
EventsManager.getInstance();
MetricManager.getInstance();
- tableService = new DefaultTableService(serviceConfig);
- optimizingService = new DefaultOptimizingService(serviceConfig,
tableService);
+ catalogManager = new DefaultCatalogManager(serviceConfig);
+ tableService = new DefaultTableService(serviceConfig, catalogManager);
+ optimizingService = new DefaultOptimizingService(serviceConfig,
catalogManager, tableService);
LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
@@ -164,7 +168,7 @@ public class AmoroServiceContainer {
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
- terminalManager = new TerminalManager(serviceConfig, tableService);
+ terminalManager = new TerminalManager(serviceConfig, catalogManager,
tableService);
initThriftService();
startThriftService();
@@ -240,8 +244,9 @@ public class AmoroServiceContainer {
private void initHttpService() {
DashboardServer dashboardServer =
- new DashboardServer(serviceConfig, tableService, optimizingService,
terminalManager);
- RestCatalogService restCatalogService = new
RestCatalogService(tableService);
+ new DashboardServer(
+ serviceConfig, catalogManager, tableService, optimizingService,
terminalManager);
+ RestCatalogService restCatalogService = new
RestCatalogService(catalogManager, tableService);
httpServer =
Javalin.create(
@@ -333,7 +338,7 @@ public class AmoroServiceContainer {
new AmoroTableMetastore.Processor<>(
ThriftServiceProxy.createProxy(
AmoroTableMetastore.Iface.class,
- new TableManagementService(tableService),
+ new TableManagementService(catalogManager, tableService),
AmoroRuntimeException::normalizeCompatibly));
tableManagementServer =
createThriftServer(
@@ -536,6 +541,11 @@ public class AmoroServiceContainer {
return this.tableService;
}
+ @VisibleForTesting
+ public CatalogManager getCatalogManager() {
+ return this.catalogManager;
+ }
+
@VisibleForTesting
public OptimizerManager getOptimizingService() {
return this.optimizingService;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 5d13c6dfb..bc886423b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -37,6 +37,7 @@ import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
@@ -97,17 +98,22 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final Map<String, OptimizingQueue> optimizingQueueByToken = new
ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new
ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
+ private final CatalogManager catalogManager;
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
- public DefaultOptimizingService(Configurations serviceConfig,
DefaultTableService tableService) {
+ public DefaultOptimizingService(
+ Configurations serviceConfig,
+ CatalogManager catalogManager,
+ DefaultTableService tableService) {
this.optimizerTouchTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.tableService = tableService;
+ this.catalogManager = catalogManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
@@ -391,7 +397,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
public boolean canDeleteResourceGroup(String name) {
- for (CatalogMeta catalogMeta : tableService.listCatalogMetas()) {
+ for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) {
if (catalogMeta.getCatalogProperties() != null
&& catalogMeta
.getCatalogProperties()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
index a58bfb679..c3bd12b69 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java
@@ -39,6 +39,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.events.IcebergReportEvent;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.EventsManager;
@@ -106,9 +107,11 @@ public class RestCatalogService extends PersistentBase {
private final JavalinJackson jsonMapper;
+ private final CatalogManager catalogManager;
private final TableService tableService;
- public RestCatalogService(TableService tableService) {
+ public RestCatalogService(CatalogManager catalogManager, TableService
tableService) {
+ this.catalogManager = catalogManager;
this.tableService = tableService;
ObjectMapper objectMapper = jsonMapper();
this.jsonMapper = new JavalinJackson(objectMapper);
@@ -432,7 +435,7 @@ public class RestCatalogService extends PersistentBase {
private InternalCatalog getCatalog(String catalog) {
Preconditions.checkNotNull(catalog, "lack required path variables:
catalog");
- ServerCatalog internalCatalog = tableService.getServerCatalog(catalog);
+ ServerCatalog internalCatalog = catalogManager.getServerCatalog(catalog);
Preconditions.checkArgument(
internalCatalog instanceof InternalCatalog, "The catalog is not an
iceberg rest catalog");
Set<TableFormat> tableFormats =
CatalogUtil.tableFormats(internalCatalog.getMetadata());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java
index 92fa90f22..4ed25538b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/TableManagementService.java
@@ -29,6 +29,7 @@ import org.apache.amoro.api.OperationConflictException;
import org.apache.amoro.api.TableCommitMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.api.TableMeta;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.table.TableMetadata;
@@ -42,9 +43,11 @@ import java.util.stream.Collectors;
public class TableManagementService implements AmoroTableMetastore.Iface {
+ private final CatalogManager catalogManager;
private final TableService tableService;
- public TableManagementService(TableService tableService) {
+ public TableManagementService(CatalogManager catalogManager, TableService
tableService) {
+ this.catalogManager = catalogManager;
this.tableService = tableService;
}
@@ -53,29 +56,29 @@ public class TableManagementService implements
AmoroTableMetastore.Iface {
@Override
public List<CatalogMeta> getCatalogs() {
- return tableService.listCatalogMetas();
+ return catalogManager.listCatalogMetas();
}
@Override
public CatalogMeta getCatalog(String name) {
- return tableService.getCatalogMeta(name);
+ return catalogManager.getCatalogMeta(name);
}
@Override
public List<String> getDatabases(String catalogName) {
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalogName);
+ ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalogName);
return serverCatalog.listDatabases();
}
@Override
public void createDatabase(String catalogName, String database) {
- InternalCatalog serverCatalog =
tableService.getInternalCatalog(catalogName);
+ InternalCatalog serverCatalog =
catalogManager.getInternalCatalog(catalogName);
serverCatalog.createDatabase(database);
}
@Override
public void dropDatabase(String catalogName, String database) {
- InternalCatalog serverCatalog =
tableService.getInternalCatalog(catalogName);
+ InternalCatalog serverCatalog =
catalogManager.getInternalCatalog(catalogName);
serverCatalog.dropDatabase(database);
}
@@ -87,7 +90,7 @@ public class TableManagementService implements
AmoroTableMetastore.Iface {
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
tableMeta.getTableIdentifier(),
TableFormat.valueOf(tableMeta.getFormat()));
- InternalCatalog catalog =
tableService.getInternalCatalog(identifier.getCatalog());
+ InternalCatalog catalog =
catalogManager.getInternalCatalog(identifier.getCatalog());
CatalogMeta catalogMeta = catalog.getMetadata();
TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta,
catalogMeta);
tableService.createTable(catalog.name(), tableMetadata);
@@ -95,7 +98,7 @@ public class TableManagementService implements
AmoroTableMetastore.Iface {
@Override
public List<TableMeta> listTables(String catalogName, String database) {
- InternalCatalog serverCatalog =
tableService.getInternalCatalog(catalogName);
+ InternalCatalog serverCatalog =
catalogManager.getInternalCatalog(catalogName);
List<TableMetadata> tableMetadataList =
serverCatalog.listTableMetadataInDatabase(database);
return tableMetadataList.stream()
.map(TableMetadata::buildTableMeta)
@@ -104,7 +107,7 @@ public class TableManagementService implements
AmoroTableMetastore.Iface {
@Override
public TableMeta getTable(TableIdentifier tableIdentifier) {
- InternalCatalog serverCatalog =
tableService.getInternalCatalog(tableIdentifier.getCatalog());
+ InternalCatalog serverCatalog =
catalogManager.getInternalCatalog(tableIdentifier.getCatalog());
TableMetadata tableMetadata =
serverCatalog.loadTableMetadata(
tableIdentifier.getDatabase(), tableIdentifier.getTableName());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java
similarity index 92%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogService.java
rename to
amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java
index edca66f9a..35ba55bae 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogManager.java
@@ -22,8 +22,8 @@ import org.apache.amoro.api.CatalogMeta;
import java.util.List;
-/** The CatalogService interface defines the operations that can be performed
on catalogs. */
-public interface CatalogService {
+/** The CatalogManager interface defines the operations that can be performed
on catalogs. */
+public interface CatalogManager {
/**
* Returns a list of CatalogMeta objects.
*
@@ -62,6 +62,13 @@ public interface CatalogService {
*/
InternalCatalog getInternalCatalog(String catalogName);
+ /**
+ * Retrieves all ExternalCatalogs.
+ *
+ * @return a list of ExternalCatalogs
+ */
+ List<ExternalCatalog> getExternalCatalogs();
+
/**
* Creates a catalog based on the provided catalog meta information. The
catalog name is obtained
* from the catalog meta.
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
new file mode 100644
index 000000000..c2ead9592
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.catalog;
+
+import org.apache.amoro.api.CatalogMeta;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.AlreadyExistsException;
+import org.apache.amoro.exception.IllegalMetadataException;
+import org.apache.amoro.exception.ObjectNotExistsException;
+import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.shade.guava32.com.google.common.cache.CacheBuilder;
+import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
+import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class DefaultCatalogManager extends PersistentBase implements
CatalogManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultCatalogManager.class);
+ protected final Configurations serverConfiguration;
+ private final LoadingCache<String, Optional<CatalogMeta>> metaCache;
+
+ private final Map<String, ServerCatalog> serverCatalogMap =
Maps.newConcurrentMap();
+
+ public DefaultCatalogManager(Configurations serverConfiguration) {
+ this.serverConfiguration = serverConfiguration;
+ Duration cacheTtl =
+
serverConfiguration.get(AmoroManagementConf.CATALOG_META_CACHE_EXPIRATION_INTERVAL);
+ metaCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(100)
+ .expireAfterWrite(cacheTtl)
+ .build(
+ new CacheLoader<String, Optional<CatalogMeta>>() {
+ @Override
+ public @NotNull Optional<CatalogMeta> load(@NotNull String
key) throws Exception {
+ return Optional.ofNullable(
+ getAs(CatalogMetaMapper.class, mapper ->
mapper.getCatalog(key)));
+ }
+ });
+
+ listCatalogMetas()
+ .forEach(
+ c -> {
+ ServerCatalog serverCatalog =
+ CatalogBuilder.buildServerCatalog(c, serverConfiguration);
+ serverCatalogMap.put(c.getCatalogName(), serverCatalog);
+ metaCache.put(c.getCatalogName(), Optional.of(c));
+ LOG.info("Load catalog {}, type:{}", c.getCatalogName(),
c.getCatalogType());
+ });
+ LOG.info("DefaultCatalogManager initialized, total catalogs: {}",
serverCatalogMap.size());
+ }
+
+ @Override
+ public List<CatalogMeta> listCatalogMetas() {
+ return getAs(CatalogMetaMapper.class,
CatalogMetaMapper::getCatalogs).stream()
+ .peek(c -> metaCache.put(c.getCatalogName(), Optional.of(c)))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogMeta getCatalogMeta(String catalogName) {
+ return getCatalogMetaOptional(catalogName)
+ .orElseThrow(() -> new ObjectNotExistsException("Catalog " +
catalogName));
+ }
+
+ private Optional<CatalogMeta> getCatalogMetaOptional(String catalogName) {
+ try {
+ return metaCache.get(catalogName);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean catalogExist(String catalogName) {
+ return getCatalogMetaOptional(catalogName).isPresent();
+ }
+
+ @Override
+ public ServerCatalog getServerCatalog(String catalogName) {
+ Optional<CatalogMeta> catalogMeta = getCatalogMetaOptional(catalogName);
+ if (!catalogMeta.isPresent()) {
+ // remove if catalog is deleted
+ disposeCatalog(catalogName);
+ throw new ObjectNotExistsException("Catalog " + catalogName);
+ }
+ ServerCatalog serverCatalog =
+ serverCatalogMap.computeIfAbsent(
+ catalogName,
+ n -> CatalogBuilder.buildServerCatalog(catalogMeta.get(),
serverConfiguration));
+ serverCatalog.reload(catalogMeta.get());
+ return serverCatalog;
+ }
+
+ @Override
+ public InternalCatalog getInternalCatalog(String catalogName) {
+ ServerCatalog serverCatalog = getServerCatalog(catalogName);
+ if (serverCatalog == null) {
+ throw new ObjectNotExistsException("Catalog " + catalogName);
+ }
+ if (serverCatalog.isInternal()) {
+ return (InternalCatalog) serverCatalog;
+ }
+ throw new ObjectNotExistsException("Catalog " + catalogName + " is not
internal catalog");
+ }
+
+ @Override
+ public List<ExternalCatalog> getExternalCatalogs() {
+ return listCatalogMetas().stream()
+ .filter(c -> !isInternal(c))
+ .map(
+ c -> {
+ ServerCatalog serverCatalog =
+ serverCatalogMap.computeIfAbsent(
+ c.getCatalogName(),
+ n -> CatalogBuilder.buildServerCatalog(c,
serverConfiguration));
+ serverCatalog.reload(c);
+ return serverCatalog;
+ })
+ .map(c -> (ExternalCatalog) c)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void createCatalog(CatalogMeta catalogMeta) {
+ if (catalogExist(catalogMeta.getCatalogName())) {
+ throw new AlreadyExistsException("Catalog " +
catalogMeta.getCatalogName());
+ }
+ // Build to make sure the catalog is valid
+ ServerCatalog catalog = CatalogBuilder.buildServerCatalog(catalogMeta,
serverConfiguration);
+ doAs(CatalogMetaMapper.class, mapper ->
mapper.insertCatalog(catalog.getMetadata()));
+ disposeCatalog(catalogMeta.getCatalogName());
+ serverCatalogMap.put(catalogMeta.getCatalogName(), catalog);
+ LOG.info(
+ "Create catalog {}, type:{}", catalogMeta.getCatalogName(),
catalogMeta.getCatalogType());
+ }
+
+ @Override
+ public void dropCatalog(String catalogName) {
+ doAs(
+ CatalogMetaMapper.class,
+ mapper -> {
+ CatalogMeta meta = mapper.getCatalog(catalogName);
+ if (isInternal(meta)) {
+ int dbCount = mapper.selectDatabaseCount(catalogName);
+ int tblCount = mapper.selectTableCount(catalogName);
+ if (dbCount > 0 || tblCount > 0) {
+ throw new IllegalMetadataException(
+ "Cannot drop internal catalog with databases or tables");
+ }
+ }
+ mapper.deleteCatalog(catalogName);
+ metaCache.invalidate(catalogName);
+ });
+
+ disposeCatalog(catalogName);
+ }
+
+ @Override
+ public void updateCatalog(CatalogMeta catalogMeta) {
+ ServerCatalog catalog = getServerCatalog(catalogMeta.getCatalogName());
+ validateCatalogUpdate(catalog.getMetadata(), catalogMeta);
+
+ metaCache.invalidate(catalogMeta.getCatalogName());
+ catalog.updateMetadata(catalogMeta);
+ LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName());
+ }
+
+ private void validateCatalogUpdate(CatalogMeta oldMeta, CatalogMeta newMeta)
{
+ if (!oldMeta.getCatalogType().equals(newMeta.getCatalogType())) {
+ throw new IllegalMetadataException("Cannot update catalog type");
+ }
+ }
+
+ private void disposeCatalog(String name) {
+ serverCatalogMap.computeIfPresent(
+ name,
+ (n, c) -> {
+ LOG.info("Dispose catalog: {}", n);
+ c.dispose();
+ return null;
+ });
+ metaCache.invalidate(name);
+ }
+
+ private static boolean isInternal(CatalogMeta meta) {
+ return
CatalogMetaProperties.CATALOG_TYPE_AMS.equalsIgnoreCase(meta.getCatalogType());
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ExternalCatalog.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ExternalCatalog.java
index 5ae37a304..8148f64a3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ExternalCatalog.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ExternalCatalog.java
@@ -28,8 +28,6 @@ import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.table.TableMetaStore;
-import org.apache.amoro.utils.CatalogUtil;
import java.util.ArrayList;
import java.util.List;
@@ -40,18 +38,21 @@ import java.util.stream.Collectors;
public class ExternalCatalog extends ServerCatalog {
UnifiedCatalog unifiedCatalog;
- TableMetaStore tableMetaStore;
private Pattern tableFilterPattern;
private Pattern databaseFilterPattern;
protected ExternalCatalog(CatalogMeta metadata) {
super(metadata);
- this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.unifiedCatalog =
- this.tableMetaStore.doAs(
- () -> new CommonUnifiedCatalog(this::getMetadata,
Maps.newHashMap()));
- updateTableFilter(metadata);
- updateDatabaseFilter(metadata);
+ metaStore.doAs(() -> new CommonUnifiedCatalog(this::getMetadata,
Maps.newHashMap()));
+ catalogMetadataChanged();
+ }
+
+ @Override
+ protected void catalogMetadataChanged() {
+ this.unifiedCatalog.refresh();
+ updateDatabaseFilter(getMetadata());
+ updateTableFilter(getMetadata());
}
public void syncTable(String database, String tableName, TableFormat format)
{
@@ -73,15 +74,6 @@ public class ExternalCatalog extends ServerCatalog {
mapper -> mapper.deleteTableIdByName(getMetadata().getCatalogName(),
database, tableName));
}
- @Override
- public void updateMetadata(CatalogMeta metadata) {
- super.updateMetadata(metadata);
- this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
- this.unifiedCatalog.refresh();
- updateDatabaseFilter(metadata);
- updateTableFilter(metadata);
- }
-
@Override
public boolean databaseExists(String database) {
return doAs(() -> unifiedCatalog.databaseExists(database));
@@ -158,6 +150,6 @@ public class ExternalCatalog extends ServerCatalog {
}
private <T> T doAs(Callable<T> callable) {
- return tableMetaStore.doAs(callable);
+ return metaStore.doAs(callable);
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
index 2c0a20181..b823f94b1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java
@@ -44,6 +44,11 @@ public abstract class InternalCatalog extends ServerCatalog {
super(metadata);
}
+ @Override
+ public boolean isInternal() {
+ return true;
+ }
+
@Override
public List<String> listDatabases() {
return getAs(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
index dfeede9c2..faa5974d8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java
@@ -21,18 +21,21 @@ package org.apache.amoro.server.catalog;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.api.CatalogMeta;
-import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.utils.CatalogUtil;
import java.util.List;
public abstract class ServerCatalog extends PersistentBase {
private volatile CatalogMeta metadata;
+ protected volatile TableMetaStore metaStore;
protected ServerCatalog(CatalogMeta metadata) {
this.metadata = metadata;
+ this.metaStore = CatalogUtil.buildMetaStore(metadata);
}
public String name() {
@@ -46,6 +49,26 @@ public abstract class ServerCatalog extends PersistentBase {
public void updateMetadata(CatalogMeta metadata) {
doAs(CatalogMetaMapper.class, mapper -> mapper.updateCatalog(metadata));
this.metadata = metadata;
+ this.metaStore = CatalogUtil.buildMetaStore(metadata);
+ catalogMetadataChanged();
+ }
+
+ public void reload() {
+ CatalogMeta meta =
+ getAs(CatalogMetaMapper.class, mapper ->
mapper.getCatalog(metadata.getCatalogName()));
+ if (meta == null) {
+ throw new IllegalStateException("Catalog " + metadata.getCatalogName() +
" is dropped.");
+ }
+ this.reload(meta);
+ }
+
+ public void reload(CatalogMeta meta) {
+ if (this.metadata.equals(meta)) {
+ return;
+ }
+ this.metadata = meta;
+ this.metaStore = CatalogUtil.buildMetaStore(meta);
+ catalogMetadataChanged();
}
public abstract boolean databaseExists(String database);
@@ -61,13 +84,13 @@ public abstract class ServerCatalog extends PersistentBase {
public abstract AmoroTable<?> loadTable(String database, String tableName);
public void dispose() {
- doAsTransaction(
- () ->
- doAsExisted(
- CatalogMetaMapper.class,
- mapper -> mapper.deleteCatalog(name()),
- () ->
- new IllegalMetadataException(
- "Catalog " + name() + " has more than one database or
table")));
+ // do resource clean up
}
+
+ public boolean isInternal() {
+ return false;
+ }
+
+ /** Called when catalog metadata is changed. */
+ protected void catalogMetadataChanged() {}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index ccedcfaee..01c34c2c7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -37,6 +37,7 @@ import org.apache.amoro.exception.SignatureCheckException;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.DefaultOptimizingService;
import org.apache.amoro.server.RestCatalogService;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.dashboard.controller.CatalogController;
import org.apache.amoro.server.dashboard.controller.HealthCheckController;
import org.apache.amoro.server.dashboard.controller.LoginController;
@@ -96,19 +97,22 @@ public class DashboardServer {
public DashboardServer(
Configurations serviceConfig,
+ CatalogManager catalogManager,
TableService tableService,
DefaultOptimizingService optimizerManager,
TerminalManager terminalManager) {
PlatformFileManager platformFileManager = new PlatformFileManager();
- this.catalogController = new CatalogController(tableService,
platformFileManager);
+ this.catalogController = new CatalogController(catalogManager,
platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerGroupController = new OptimizerGroupController(tableService,
optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new
PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig,
optimizerManager);
- ServerTableDescriptor tableDescriptor = new
ServerTableDescriptor(tableService, serviceConfig);
- this.tableController = new TableController(tableService, tableDescriptor,
serviceConfig);
+ ServerTableDescriptor tableDescriptor =
+ new ServerTableDescriptor(catalogManager, tableService, serviceConfig);
+ this.tableController =
+ new TableController(catalogManager, tableService, tableDescriptor,
serviceConfig);
this.terminalController = new TerminalController(terminalManager);
this.versionController = new VersionController();
this.overviewController = new OverviewController();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
index 7b8ed7b8a..89e4aded9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableService;
@@ -50,10 +51,13 @@ public class ServerTableDescriptor extends PersistentBase {
private final Map<TableFormat, FormatTableDescriptor> formatDescriptorMap =
new HashMap<>();
+ private final CatalogManager catalogManager;
private final TableService tableService;
- public ServerTableDescriptor(TableService tableService, Configurations
serviceConfig) {
+ public ServerTableDescriptor(
+ CatalogManager catalogManager, TableService tableService, Configurations
serviceConfig) {
this.tableService = tableService;
+ this.catalogManager = catalogManager;
// All table formats will jointly reuse the work thread pool named
iceberg-worker-pool-%d
ExecutorService executorService = ThreadPools.getWorkerPool();
@@ -146,7 +150,7 @@ public class ServerTableDescriptor extends PersistentBase {
}
private AmoroTable<?> loadTable(TableIdentifier identifier) {
- ServerCatalog catalog =
tableService.getServerCatalog(identifier.getCatalog());
+ ServerCatalog catalog =
catalogManager.getServerCatalog(identifier.getCatalog());
return catalog.loadTable(identifier.getDatabase(),
identifier.getTableName());
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
index 6772d87c5..7bb783b09 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
@@ -56,7 +56,7 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.AmoroManagementConf;
-import org.apache.amoro.server.catalog.CatalogService;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.dashboard.PlatformFileManager;
@@ -66,7 +66,6 @@ import
org.apache.amoro.server.dashboard.model.CatalogSettingInfo.ConfigFileItem
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.utils.DesensitizationUtil;
import org.apache.amoro.server.dashboard.utils.PropertiesUtil;
-import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
@@ -166,10 +165,11 @@ public class CatalogController {
}
private final PlatformFileManager platformFileInfoService;
- private final CatalogService tableService;
+ private final CatalogManager catalogService;
- public CatalogController(TableService tableService, PlatformFileManager
platformFileInfoService) {
- this.tableService = tableService;
+ public CatalogController(
+ CatalogManager catalogService, PlatformFileManager
platformFileInfoService) {
+ this.catalogService = catalogService;
this.platformFileInfoService = platformFileInfoService;
}
@@ -544,11 +544,11 @@ public class CatalogController {
public void createCatalog(Context ctx) {
CatalogRegisterInfo info = ctx.bodyAsClass(CatalogRegisterInfo.class);
validateCatalogRegisterInfo(info);
- if (tableService.catalogExist(info.getName())) {
+ if (catalogService.catalogExist(info.getName())) {
throw new RuntimeException("Duplicate catalog name!");
}
CatalogMeta catalogMeta = constructCatalogMeta(info, null);
- tableService.createCatalog(catalogMeta);
+ catalogService.createCatalog(catalogMeta);
ctx.json(OkResponse.of(""));
}
@@ -578,10 +578,10 @@ public class CatalogController {
/** Get detail of some catalog. */
public void getCatalogDetail(Context ctx) {
String catalogName = ctx.pathParam("catalogName");
- CatalogMeta catalogMeta = tableService.getCatalogMeta(catalogName);
+ CatalogMeta catalogMeta = catalogService.getCatalogMeta(catalogName);
CatalogSettingInfo info = new CatalogSettingInfo();
- if (tableService.catalogExist(catalogName)) {
+ if (catalogService.catalogExist(catalogName)) {
info.setName(catalogMeta.getCatalogName());
// We create ams catalog with type hadoop in v0.3, we should be
compatible with it.
if (CATALOG_TYPE_HADOOP.equals(catalogMeta.getCatalogType())
@@ -626,13 +626,13 @@ public class CatalogController {
public void updateCatalog(Context ctx) {
CatalogRegisterInfo info = ctx.bodyAsClass(CatalogRegisterInfo.class);
validateCatalogRegisterInfo(info);
- CatalogMeta optCatalog = tableService.getCatalogMeta(info.getName());
+ CatalogMeta optCatalog = catalogService.getCatalogMeta(info.getName());
Preconditions.checkNotNull(optCatalog, "Catalog not exist!");
unMaskSensitiveData(info, optCatalog);
// check only some item can be modified!
CatalogMeta catalogMeta = constructCatalogMeta(info, optCatalog);
- tableService.updateCatalog(catalogMeta);
+ catalogService.updateCatalog(catalogMeta);
ctx.json(OkResponse.ok());
}
@@ -641,7 +641,7 @@ public class CatalogController {
String catalogName = ctx.pathParam("catalogName");
Preconditions.checkArgument(
StringUtils.isNotEmpty(ctx.pathParam("catalogName")), "Catalog name is
empty!");
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalogName);
+ ServerCatalog serverCatalog = catalogService.getServerCatalog(catalogName);
if (serverCatalog instanceof InternalCatalog) {
ctx.json(OkResponse.of(serverCatalog.listTables().size() == 0));
} else {
@@ -654,7 +654,7 @@ public class CatalogController {
String catalogName = ctx.pathParam("catalogName");
Preconditions.checkArgument(
StringUtils.isNotEmpty(ctx.pathParam("catalogName")), "Catalog name is
empty!");
- tableService.dropCatalog(catalogName);
+ catalogService.dropCatalog(catalogName);
ctx.json(OkResponse.of("OK"));
}
@@ -675,7 +675,7 @@ public class CatalogController {
&& StringUtils.isNotEmpty(configKey),
"Catalog name or auth type or config key is null!");
- CatalogMeta catalogMeta = tableService.getCatalogMeta(catalogName);
+ CatalogMeta catalogMeta = catalogService.getCatalogMeta(catalogName);
if (CONFIG_TYPE_STORAGE.equalsIgnoreCase(confType)) {
Map<String, String> storageConfig = catalogMeta.getStorageConfigs();
String key = configKey.replaceAll("-", "\\.");
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index 7dfb7f7c4..0606eb49f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -36,6 +36,7 @@ import
org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.properties.HiveTableProperties;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.dashboard.ServerTableDescriptor;
import org.apache.amoro.server.dashboard.ServerTableProperties;
@@ -99,6 +100,7 @@ public class TableController {
private static final Logger LOG =
LoggerFactory.getLogger(TableController.class);
private static final long UPGRADE_INFO_EXPIRE_INTERVAL = 60 * 60 * 1000;
+ private final CatalogManager catalogManager;
private final TableService tableService;
private final ServerTableDescriptor tableDescriptor;
private final Configurations serviceConfig;
@@ -107,9 +109,11 @@ public class TableController {
private final ScheduledExecutorService tableUpgradeExecutor;
public TableController(
+ CatalogManager catalogManager,
TableService tableService,
ServerTableDescriptor tableDescriptor,
Configurations serviceConfig) {
+ this.catalogManager = catalogManager;
this.tableService = tableService;
this.tableDescriptor = tableDescriptor;
this.serviceConfig = serviceConfig;
@@ -138,7 +142,7 @@ public class TableController {
&& StringUtils.isNotBlank(database)
&& StringUtils.isNotBlank(tableName),
"catalog.database.tableName can not be empty in any element");
- Preconditions.checkState(tableService.catalogExist(catalog), "invalid
catalog!");
+ Preconditions.checkState(catalogManager.catalogExist(catalog), "invalid
catalog!");
ServerTableMeta serverTableMeta =
tableDescriptor.getTableDetail(
@@ -178,7 +182,7 @@ public class TableController {
&& StringUtils.isNotBlank(db)
&& StringUtils.isNotBlank(table),
"catalog.database.tableName can not be empty in any element");
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
+ ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalog);
CatalogMeta catalogMeta = serverCatalog.getMetadata();
TableMetaStore tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta);
HMSClientPool hmsClientPool =
@@ -217,7 +221,7 @@ public class TableController {
"catalog.database.tableName can not be empty in any element");
UpgradeHiveMeta upgradeHiveMeta = ctx.bodyAsClass(UpgradeHiveMeta.class);
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
+ ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalog);
CatalogMeta catalogMeta = serverCatalog.getMetadata();
String amsUri = AmsUtil.getAMSThriftAddress(serviceConfig,
Constants.THRIFT_TABLE_SERVICE_NAME);
catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, amsUri);
@@ -518,7 +522,7 @@ public class TableController {
StringUtils.isNotBlank(catalog) && StringUtils.isNotBlank(db),
"catalog.database can not be empty in any element");
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
+ ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalog);
Function<TableFormat, String> formatToType =
format -> {
if (format.equals(TableFormat.MIXED_HIVE) ||
format.equals(TableFormat.MIXED_ICEBERG)) {
@@ -584,7 +588,7 @@ public class TableController {
String keywords = ctx.queryParam("keywords");
List<String> dbList =
- tableService.getServerCatalog(catalog).listDatabases().stream()
+ catalogManager.getServerCatalog(catalog).listDatabases().stream()
.filter(item -> StringUtils.isBlank(keywords) ||
item.contains(keywords))
.collect(Collectors.toList());
ctx.json(OkResponse.of(dbList));
@@ -596,7 +600,7 @@ public class TableController {
* @param ctx - context for handling the request and response
*/
public void getCatalogs(Context ctx) {
- List<CatalogMeta> catalogs = tableService.listCatalogMetas();
+ List<CatalogMeta> catalogs = catalogManager.listCatalogMetas();
ctx.json(OkResponse.of(catalogs));
}
@@ -672,7 +676,7 @@ public class TableController {
&& StringUtils.isNotBlank(db)
&& StringUtils.isNotBlank(table),
"catalog.database.tableName can not be empty in any element");
- Preconditions.checkState(tableService.catalogExist(catalog), "invalid
catalog!");
+ Preconditions.checkState(catalogManager.catalogExist(catalog), "invalid
catalog!");
ServerTableIdentifier serverTableIdentifier =
tableService.getServerTableIdentifier(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
index 3d47f5197..ee62866ae 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java
@@ -74,7 +74,7 @@ public interface CatalogMetaMapper {
column = "catalog_properties",
typeHandler = Map2StringConverter.class)
})
- List<CatalogMeta> getCatalog(@Param("catalogName") String catalogName);
+ CatalogMeta getCatalog(@Param("catalogName") String catalogName);
@Insert(
"INSERT INTO "
@@ -111,6 +111,9 @@ public interface CatalogMetaMapper {
@Select("SELECT table_count FROM " + TABLE_NAME + " WHERE catalog_name =
#{catalogName}")
Integer selectTableCount(@Param("catalogName") String catalogName);
+ @Select("SELECT database_count FROM " + TABLE_NAME + " WHERE catalog_name =
#{catalogName}")
+ Integer selectDatabaseCount(@Param("catalogName") String catalogName);
+
@Update(
"UPDATE "
+ TABLE_NAME
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index a43e91724..ec65bdadc 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
-import org.apache.amoro.NoSuchTableException;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
@@ -38,7 +37,7 @@ import org.apache.amoro.exception.IllegalMetadataException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PersistenceException;
import org.apache.amoro.server.AmoroManagementConf;
-import org.apache.amoro.server.catalog.CatalogBuilder;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
@@ -46,7 +45,6 @@ import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
-import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.table.blocker.TableBlocker;
@@ -90,8 +88,6 @@ public class DefaultTableService extends PersistentBase
implements TableService
private static final int TABLE_BLOCKER_RETRY = 3;
private final long externalCatalogRefreshingInterval;
private final long blockerTimeout;
- private final Map<String, InternalCatalog> internalCatalogMap = new
ConcurrentHashMap<>();
- private final Map<String, ExternalCatalog> externalCatalogMap = new
ConcurrentHashMap<>();
private final Map<Long, TableRuntime> tableRuntimeMap = new
ConcurrentHashMap<>();
@@ -103,96 +99,18 @@ public class DefaultTableService extends PersistentBase
implements TableService
.build());
private final CompletableFuture<Boolean> initialized = new
CompletableFuture<>();
private final Configurations serverConfiguration;
+ private final CatalogManager catalogManager;
private RuntimeHandlerChain headHandler;
private ExecutorService tableExplorerExecutors;
- public DefaultTableService(Configurations configuration) {
+ public DefaultTableService(Configurations configuration, CatalogManager
catalogManager) {
+ this.catalogManager = catalogManager;
this.externalCatalogRefreshingInterval =
configuration.getLong(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL);
this.blockerTimeout =
configuration.getLong(AmoroManagementConf.BLOCKER_TIMEOUT);
this.serverConfiguration = configuration;
}
- @Override
- public List<CatalogMeta> listCatalogMetas() {
- checkStarted();
- List<CatalogMeta> catalogs =
- internalCatalogMap.values().stream()
- .map(ServerCatalog::getMetadata)
- .collect(Collectors.toList());
- catalogs.addAll(
- externalCatalogMap.values().stream()
- .map(ServerCatalog::getMetadata)
- .collect(Collectors.toList()));
- return catalogs;
- }
-
- @Override
- public CatalogMeta getCatalogMeta(String catalogName) {
- checkStarted();
- ServerCatalog catalog = getServerCatalog(catalogName);
- return catalog.getMetadata();
- }
-
- @Override
- public boolean catalogExist(String catalogName) {
- checkStarted();
- return internalCatalogMap.containsKey(catalogName)
- || externalCatalogMap.containsKey(catalogName);
- }
-
- @Override
- public ServerCatalog getServerCatalog(String catalogName) {
- ServerCatalog catalog =
- Optional.ofNullable((ServerCatalog)
internalCatalogMap.get(catalogName))
- .orElse(externalCatalogMap.get(catalogName));
- return Optional.ofNullable(catalog)
- .orElseThrow(() -> new ObjectNotExistsException("Catalog " +
catalogName));
- }
-
- @Override
- public void createCatalog(CatalogMeta catalogMeta) {
- checkStarted();
- if (catalogExist(catalogMeta.getCatalogName())) {
- throw new AlreadyExistsException("Catalog " +
catalogMeta.getCatalogName());
- }
- doAsTransaction(
- () -> doAs(CatalogMetaMapper.class, mapper ->
mapper.insertCatalog(catalogMeta)),
- () -> initServerCatalog(catalogMeta));
- }
-
- private void initServerCatalog(CatalogMeta catalogMeta) {
- ServerCatalog catalog = CatalogBuilder.buildServerCatalog(catalogMeta,
serverConfiguration);
- if (catalog instanceof InternalCatalog) {
- internalCatalogMap.put(catalogMeta.getCatalogName(), (InternalCatalog)
catalog);
- } else {
- externalCatalogMap.put(catalogMeta.getCatalogName(), (ExternalCatalog)
catalog);
- }
- }
-
- @Override
- public void dropCatalog(String catalogName) {
- checkStarted();
- ServerCatalog serverCatalog = getServerCatalog(catalogName);
- if (serverCatalog == null) {
- throw new ObjectNotExistsException("Catalog " + catalogName);
- }
-
- // TableRuntime cleanup is responsibility by exploreExternalCatalog method
- serverCatalog.dispose();
- internalCatalogMap.remove(catalogName);
- externalCatalogMap.remove(catalogName);
- }
-
- @Override
- public void updateCatalog(CatalogMeta catalogMeta) {
- checkStarted();
- ServerCatalog catalog = getServerCatalog(catalogMeta.getCatalogName());
- validateCatalogUpdate(catalog.getMetadata(), catalogMeta);
- doAs(CatalogMetaMapper.class, mapper -> mapper.updateCatalog(catalogMeta));
- catalog.updateMetadata(catalogMeta);
- }
-
@Override
public void dropTableMetadata(TableIdentifier tableIdentifier, boolean
deleteData) {
checkStarted();
@@ -206,7 +124,8 @@ public class DefaultTableService extends PersistentBase
implements TableService
throw new IllegalMetadataException("database is blank");
}
- InternalCatalog internalCatalog =
getInternalCatalog(tableIdentifier.getCatalog());
+ InternalCatalog internalCatalog =
+ catalogManager.getInternalCatalog(tableIdentifier.getCatalog());
String database = tableIdentifier.getDatabase();
String table = tableIdentifier.getTableName();
if (!internalCatalog.tableExists(database, table)) {
@@ -227,7 +146,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
@Override
public void createTable(String catalogName, TableMetadata tableMetadata) {
checkStarted();
- InternalCatalog catalog = getInternalCatalog(catalogName);
+ InternalCatalog catalog = catalogManager.getInternalCatalog(catalogName);
String database = tableMetadata.getTableIdentifier().getDatabase();
String table = tableMetadata.getTableIdentifier().getTableName();
if (catalog.tableExists(database, table)) {
@@ -248,7 +167,8 @@ public class DefaultTableService extends PersistentBase
implements TableService
@Override
public AmoroTable<?> loadTable(ServerTableIdentifier tableIdentifier) {
checkStarted();
- return getServerCatalog(tableIdentifier.getCatalog())
+ return catalogManager
+ .getServerCatalog(tableIdentifier.getCatalog())
.loadTable(tableIdentifier.getDatabase(),
tableIdentifier.getTableName());
}
@@ -372,11 +292,6 @@ public class DefaultTableService extends PersistentBase
implements TableService
}
}
- public InternalCatalog getInternalCatalog(String catalogName) {
- return Optional.ofNullable(internalCatalogMap.get(catalogName))
- .orElseThrow(() -> new ObjectNotExistsException("Catalog " +
catalogName));
- }
-
@Override
public void addHandlerChain(RuntimeHandlerChain handler) {
checkNotStarted();
@@ -404,8 +319,6 @@ public class DefaultTableService extends PersistentBase
implements TableService
@Override
public void initialize() {
checkNotStarted();
- List<CatalogMeta> catalogMetas = getAs(CatalogMetaMapper.class,
CatalogMetaMapper::getCatalogs);
- catalogMetas.forEach(this::initServerCatalog);
List<TableRuntimeMeta> tableRuntimeMetaList =
getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas);
@@ -461,26 +374,6 @@ public class DefaultTableService extends PersistentBase
implements TableService
mapper.selectTableIdentifier(id.getCatalog(), id.getDatabase(),
id.getTableName()));
}
- private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier
id) {
- ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(id);
- if (serverTableIdentifier != null) {
- return serverTableIdentifier;
- }
- ServerCatalog serverCatalog = getServerCatalog(id.getCatalog());
- if (serverCatalog instanceof InternalCatalog) {
- return null;
- }
- try {
- AmoroTable<?> table = serverCatalog.loadTable(id.database,
id.getTableName());
- TableIdentity identity =
- new TableIdentity(id.getDatabase(), id.getTableName(),
table.format());
- syncTable((ExternalCatalog) serverCatalog, identity);
- return getServerTableIdentifier(id);
- } catch (NoSuchTableException e) {
- return null;
- }
- }
-
@Override
public TableRuntime getRuntime(Long tableId) {
checkStarted();
@@ -509,8 +402,11 @@ public class DefaultTableService extends PersistentBase
implements TableService
throw new IllegalStateException("TableService is not initialized");
}
long start = System.currentTimeMillis();
- LOG.info("Syncing external catalogs: {}", String.join(",",
externalCatalogMap.keySet()));
- for (ExternalCatalog externalCatalog : externalCatalogMap.values()) {
+ List<ExternalCatalog> externalCatalogs =
catalogManager.getExternalCatalogs();
+ List<String> externalCatalogNames =
+
externalCatalogs.stream().map(ExternalCatalog::name).collect(Collectors.toList());
+ LOG.info("Syncing external catalogs: {}", String.join(",",
externalCatalogNames));
+ for (ExternalCatalog externalCatalog : externalCatalogs) {
try {
final List<CompletableFuture<Set<TableIdentity>>>
tableIdentifiersFutures =
Lists.newArrayList();
@@ -618,7 +514,9 @@ public class DefaultTableService extends PersistentBase
implements TableService
// It is permissible to have some erroneous states in the middle, as long
as the final data is
// consistent.
Set<String> catalogNames =
-
listCatalogMetas().stream().map(CatalogMeta::getCatalogName).collect(Collectors.toSet());
+ catalogManager.listCatalogMetas().stream()
+ .map(CatalogMeta::getCatalogName)
+ .collect(Collectors.toSet());
for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
if
(!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
disposeTable(tableRuntime.getTableIdentifier());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
index e069f235f..ff359aae1 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java
@@ -22,7 +22,6 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.Blocker;
import org.apache.amoro.api.TableIdentifier;
-import org.apache.amoro.server.catalog.CatalogService;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.commons.lang3.tuple.Pair;
@@ -31,7 +30,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-public interface TableService extends CatalogService, TableManager {
+public interface TableService extends TableManager {
void initialize();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
index 64dba7a82..ee4d95315 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
@@ -25,6 +25,7 @@ import org.apache.amoro.config.ConfigOptions;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.CatalogType;
import org.apache.amoro.server.dashboard.model.LatestSessionInfo;
import org.apache.amoro.server.dashboard.model.LogInfo;
@@ -59,6 +60,7 @@ public class TerminalManager {
private final Configurations serviceConfig;
private final AtomicLong threadPoolCount = new AtomicLong();
+ private final CatalogManager catalogManager;
private final TableService tableService;
private final TerminalSessionFactory sessionFactory;
private final int resultLimits;
@@ -80,8 +82,10 @@ public class TerminalManager {
new LinkedBlockingQueue<>(),
r -> new Thread(null, r, "terminal-execute-" +
threadPoolCount.incrementAndGet()));
- public TerminalManager(Configurations conf, TableService tableService) {
+ public TerminalManager(
+ Configurations conf, CatalogManager catalogManager, TableService
tableService) {
this.serviceConfig = conf;
+ this.catalogManager = catalogManager;
this.tableService = tableService;
this.resultLimits =
conf.getInteger(AmoroManagementConf.TERMINAL_RESULT_LIMIT);
this.stopOnError =
conf.getBoolean(AmoroManagementConf.TERMINAL_STOP_ON_ERROR);
@@ -101,7 +105,7 @@ public class TerminalManager {
* @return - sessionId, session refer to a sql execution context
*/
public String executeScript(String terminalId, String catalog, String
script) {
- CatalogMeta catalogMeta = tableService.getCatalogMeta(catalog);
+ CatalogMeta catalogMeta = catalogManager.getCatalogMeta(catalog);
TableMetaStore metaStore = getCatalogTableMetaStore(catalogMeta);
String sessionId = getSessionId(terminalId, metaStore, catalog);
String connectorType = catalogConnectorType(catalogMeta);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index accf1c609..ed59ea58a 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.optimizer.standalone.StandaloneOptimizer;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
@@ -67,6 +68,7 @@ public class AmsEnvironment {
private static final String OPTIMIZE_GROUP = "default";
private final AmoroServiceContainer serviceContainer;
private Configurations serviceConfig;
+ private DefaultCatalogManager catalogManager;
private DefaultTableService tableService;
private final AtomicBoolean amsExit;
private int tableServiceBindPort;
@@ -139,6 +141,9 @@ public class AmsEnvironment {
startAms();
DynFields.UnboundField<DefaultTableService> amsTableServiceField =
DynFields.builder().hiddenImpl(AmoroServiceContainer.class,
"tableService").build();
+ DynFields.UnboundField<DefaultCatalogManager> amsCatalogManagerField =
+ DynFields.builder().hiddenImpl(AmoroServiceContainer.class,
"catalogManager").build();
+ catalogManager = amsCatalogManagerField.bind(serviceContainer).get();
tableService = amsTableServiceField.bind(serviceContainer).get();
DynFields.UnboundField<CompletableFuture<Boolean>> tableServiceField =
DynFields.builder().hiddenImpl(DefaultTableService.class,
"initialized").build();
@@ -194,7 +199,7 @@ public class AmsEnvironment {
}
public boolean tableExist(TableIdentifier tableIdentifier) {
- ServerCatalog catalog =
tableService.getServerCatalog(tableIdentifier.getCatalog());
+ ServerCatalog catalog =
catalogManager.getServerCatalog(tableIdentifier.getCatalog());
return catalog.tableExists(tableIdentifier.getDatabase(),
tableIdentifier.getTableName());
}
@@ -221,7 +226,7 @@ public class AmsEnvironment {
properties,
TableFormat.ICEBERG);
- tableService.createCatalog(catalogMeta);
+ catalogManager.createCatalog(catalogMeta);
}
private void createExternalIcebergCatalog() {
@@ -235,7 +240,7 @@ public class AmsEnvironment {
CatalogMetaProperties.CATALOG_TYPE_HADOOP,
properties,
TableFormat.ICEBERG);
- tableService.createCatalog(catalogMeta);
+ catalogManager.createCatalog(catalogMeta);
}
private void createInternalMixIcebergCatalog() {
@@ -249,7 +254,7 @@ public class AmsEnvironment {
CatalogMetaProperties.CATALOG_TYPE_AMS,
properties,
TableFormat.MIXED_ICEBERG);
- tableService.createCatalog(catalogMeta);
+ catalogManager.createCatalog(catalogMeta);
catalogs.put(
INTERNAL_MIXED_ICEBERG_CATALOG,
CatalogLoader.load(getTableServiceUrl() + "/" +
INTERNAL_MIXED_ICEBERG_CATALOG));
@@ -260,7 +265,7 @@ public class AmsEnvironment {
CatalogMeta catalogMeta =
CatalogTestHelpers.buildHiveCatalogMeta(
MIXED_HIVE_CATALOG, properties, testHMS.hiveConf(),
TableFormat.MIXED_HIVE);
- tableService.createCatalog(catalogMeta);
+ catalogManager.createCatalog(catalogMeta);
catalogs.put(
MIXED_HIVE_CATALOG, CatalogLoader.load(getTableServiceUrl() + "/" +
MIXED_HIVE_CATALOG));
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
index 99b1853fa..fe798dc78 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java
@@ -23,6 +23,7 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableRuntime;
@@ -75,6 +76,7 @@ public abstract class RestCatalogServiceTestBase {
protected abstract String catalogName();
+ protected CatalogManager catalogManager;
protected TableService tableService;
protected InternalCatalog serverCatalog;
@@ -82,8 +84,9 @@ public abstract class RestCatalogServiceTestBase {
@BeforeEach
public void before() {
+ catalogManager = ams.serviceContainer().getCatalogManager();
tableService = ams.serviceContainer().getTableService();
- serverCatalog = (InternalCatalog)
tableService.getServerCatalog(catalogName());
+ serverCatalog = catalogManager.getInternalCatalog(catalogName());
location =
serverCatalog.getMetadata().getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE)
+ "/"
@@ -114,7 +117,7 @@ public abstract class RestCatalogServiceTestBase {
}
protected TableMetadata getTableMetadata(TableIdentifier identifier) {
- InternalCatalog internalCatalog =
tableService.getInternalCatalog(identifier.getCatalog());
+ InternalCatalog internalCatalog =
catalogManager.getInternalCatalog(identifier.getCatalog());
return internalCatalog.loadTableMetadata(identifier.getDatabase(),
identifier.getTableName());
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
index 598254073..936b54c5b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
@@ -75,7 +75,7 @@ public class TestInternalIcebergCatalogService extends
RestCatalogServiceTestBas
CatalogMeta oldMeta = meta.deepCopy();
meta.putToCatalogProperties("cache-enabled", "false");
meta.putToCatalogProperties("cache.expiration-interval-ms", "10000");
- serverCatalog.updateMetadata(meta);
+ catalogManager.updateCatalog(meta);
String warehouseInAMS =
meta.getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE);
Map<String, String> clientSideConfiguration = Maps.newHashMap();
@@ -91,7 +91,7 @@ public class TestInternalIcebergCatalogService extends
RestCatalogServiceTestBas
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
- serverCatalog.updateMetadata(oldMeta);
+ catalogManager.updateCatalog(oldMeta);
}
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java
index 3a7eb3731..a50d911f3 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TableCatalogTestBase.java
@@ -52,13 +52,13 @@ public class TableCatalogTestBase extends
TableServiceTestBase {
amoroCatalogTestHelper.initWarehouse(path);
amoroCatalogTestHelper.initHiveConf(TEST_HMS.getHiveConf());
this.amoroCatalog = amoroCatalogTestHelper.amoroCatalog();
- tableService().createCatalog(amoroCatalogTestHelper.getCatalogMeta());
+ CATALOG_MANAGER.createCatalog(amoroCatalogTestHelper.getCatalogMeta());
this.originalCatalog = amoroCatalogTestHelper.originalCatalog();
}
@After
public void clean() {
- tableService().dropCatalog(amoroCatalogTestHelper.catalogName());
+ CATALOG_MANAGER.dropCatalog(amoroCatalogTestHelper.catalogName());
amoroCatalogTestHelper.clean();
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TestServerCatalog.java
b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TestServerCatalog.java
index 1e41c1ede..675ba6d46 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TestServerCatalog.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/catalog/TestServerCatalog.java
@@ -103,7 +103,7 @@ public class TestServerCatalog extends TableCatalogTestBase
{
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, dbWithFilter + "." +
tableWithFilter1);
- getServerCatalog().updateMetadata(metadata);
+ CATALOG_MANAGER.updateCatalog(metadata);
Assert.assertEquals(1, getServerCatalog().listTables(dbWithFilter).size());
Assert.assertEquals(
tableWithFilter1,
@@ -113,20 +113,20 @@ public class TestServerCatalog extends
TableCatalogTestBase {
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, dbWithFilter + "\\." +
".+");
- getServerCatalog().updateMetadata(metadata2);
+ CATALOG_MANAGER.updateCatalog(metadata2);
Assert.assertEquals(2, getServerCatalog().listTables(dbWithFilter).size());
CatalogMeta metadata3 = getServerCatalog().getMetadata();
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, testDatabaseName + "\\."
+ ".+");
- getServerCatalog().updateMetadata(metadata3);
+ CATALOG_MANAGER.updateCatalog(metadata3);
Assert.assertEquals(1,
getServerCatalog().listTables(testDatabaseName).size());
Assert.assertTrue(getServerCatalog().listTables(dbWithFilter).isEmpty());
CatalogMeta metadata4 = getServerCatalog().getMetadata();
metadata.getCatalogProperties().remove(CatalogMetaProperties.KEY_TABLE_FILTER);
- getServerCatalog().updateMetadata(metadata4);
+ CATALOG_MANAGER.updateCatalog(metadata4);
}
@Test
@@ -135,6 +135,6 @@ public class TestServerCatalog extends TableCatalogTestBase
{
}
private ServerCatalog getServerCatalog() {
- return
tableService().getServerCatalog(getAmoroCatalogTestHelper().catalogName());
+ return
CATALOG_MANAGER.getServerCatalog(getAmoroCatalogTestHelper().catalogName());
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
index ec8dbd01f..29ab60d53 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestIcebergHadoopOptimizing.java
@@ -407,7 +407,7 @@ public class TestIcebergHadoopOptimizing extends
AbstractOptimizingTest {
private Table createIcebergTable(String catalog, PartitionSpec spec, int
formatVersion) {
ServerCatalog serverCatalog =
- amsEnv.serviceContainer().getTableService().getServerCatalog(catalog);
+
amsEnv.serviceContainer().getCatalogManager().getServerCatalog(catalog);
if (serverCatalog instanceof InternalCatalog) {
this.serverCatalog = (InternalCatalog) serverCatalog;
if (!this.serverCatalog.databaseExists(DATABASE)) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
index df7e478cf..2b13018f1 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java
@@ -96,7 +96,7 @@ public class AMSTableTestBase extends TableServiceTestBase {
}
}
- tableService().createCatalog(catalogMeta);
+ CATALOG_MANAGER.createCatalog(catalogMeta);
try {
Database database = new Database();
database.setName(TableTestHelper.TEST_DB_NAME);
@@ -120,7 +120,7 @@ public class AMSTableTestBase extends TableServiceTestBase {
dropDatabase();
}
if (catalogMeta != null) {
- tableService().dropCatalog(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.dropCatalog(catalogMeta.getCatalogName());
TEST_HMS.getHiveClient().dropDatabase(TableTestHelper.TEST_DB_NAME,
false, true);
}
}
@@ -146,7 +146,7 @@ public class AMSTableTestBase extends TableServiceTestBase {
protected void createDatabase() {
if (externalCatalog == null) {
InternalCatalog catalog =
- tableService().getInternalCatalog(TableTestHelper.TEST_CATALOG_NAME);
+
CATALOG_MANAGER.getInternalCatalog(TableTestHelper.TEST_CATALOG_NAME);
if (!catalog.listDatabases().contains(TableTestHelper.TEST_DB_NAME)) {
catalog.createDatabase(TableTestHelper.TEST_DB_NAME);
}
@@ -161,7 +161,7 @@ public class AMSTableTestBase extends TableServiceTestBase {
protected void dropDatabase() {
if (externalCatalog == null) {
InternalCatalog catalog =
- tableService().getInternalCatalog(TableTestHelper.TEST_CATALOG_NAME);
+
CATALOG_MANAGER.getInternalCatalog(TableTestHelper.TEST_CATALOG_NAME);
catalog.dropDatabase(TableTestHelper.TEST_DB_NAME);
} else {
externalCatalog.dropDatabase(TableTestHelper.TEST_DB_NAME);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java
index 8dd33ff77..0138e4e4f 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TableServiceTestBase.java
@@ -22,6 +22,7 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.DefaultOptimizingService;
+import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.manager.MetricManager;
import org.junit.AfterClass;
@@ -33,6 +34,7 @@ public abstract class TableServiceTestBase {
@ClassRule public static DerbyPersistence DERBY = new DerbyPersistence();
+ protected static DefaultCatalogManager CATALOG_MANAGER = null;
private static DefaultTableService TABLE_SERVICE = null;
private static DefaultOptimizingService OPTIMIZING_SERVICE = null;
@@ -41,8 +43,10 @@ public abstract class TableServiceTestBase {
try {
Configurations configurations = new Configurations();
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 800L);
- TABLE_SERVICE = new DefaultTableService(new Configurations());
- OPTIMIZING_SERVICE = new DefaultOptimizingService(configurations,
TABLE_SERVICE);
+ CATALOG_MANAGER = new DefaultCatalogManager(configurations);
+ TABLE_SERVICE = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
+ OPTIMIZING_SERVICE =
+ new DefaultOptimizingService(configurations, CATALOG_MANAGER,
TABLE_SERVICE);
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
TABLE_SERVICE.initialize();
try {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
index fd6a5def6..48362f65d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java
@@ -60,18 +60,18 @@ public class TestCatalogService extends
TableServiceTestBase {
public void testCreateAndDropCatalog() {
CatalogMeta catalogMeta = catalogTestHelper.buildCatalogMeta("/tmp");
// test create catalog
- tableService().createCatalog(catalogMeta);
+ CATALOG_MANAGER.createCatalog(catalogMeta);
// test create duplicate catalog
Assert.assertThrows(
- AlreadyExistsException.class, () ->
tableService().createCatalog(catalogMeta));
+ AlreadyExistsException.class, () ->
CATALOG_MANAGER.createCatalog(catalogMeta));
// test get catalog
- CatalogMeta readCatalogMeta =
tableService().getCatalogMeta(catalogMeta.getCatalogName());
+ CatalogMeta readCatalogMeta =
CATALOG_MANAGER.getCatalogMeta(catalogMeta.getCatalogName());
Assert.assertEquals(catalogMeta, readCatalogMeta);
// test get catalog list
- List<CatalogMeta> catalogMetas = tableService().listCatalogMetas();
+ List<CatalogMeta> catalogMetas = CATALOG_MANAGER.listCatalogMetas();
Assert.assertEquals(1, catalogMetas.size());
Assert.assertEquals(
catalogMeta,
@@ -81,57 +81,57 @@ public class TestCatalogService extends
TableServiceTestBase {
.orElseThrow(() -> new IllegalStateException("Cannot find expect
catalog")));
// test catalogExist
-
Assert.assertTrue(tableService().catalogExist(catalogMeta.getCatalogName()));
+
Assert.assertTrue(CATALOG_MANAGER.catalogExist(catalogMeta.getCatalogName()));
// test drop catalog
- tableService().dropCatalog(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.dropCatalog(catalogMeta.getCatalogName());
// test drop not existed catalog
Assert.assertThrows(
ObjectNotExistsException.class,
- () -> tableService().getCatalogMeta(catalogMeta.getCatalogName()));
+ () -> CATALOG_MANAGER.getCatalogMeta(catalogMeta.getCatalogName()));
-
Assert.assertFalse(tableService().catalogExist(catalogMeta.getCatalogName()));
+
Assert.assertFalse(CATALOG_MANAGER.catalogExist(catalogMeta.getCatalogName()));
}
@Test
public void testUpdateCatalog() {
CatalogMeta catalogMeta = catalogTestHelper.buildCatalogMeta("/tmp");
- tableService().createCatalog(catalogMeta);
+ CATALOG_MANAGER.createCatalog(catalogMeta);
CatalogMeta updateCatalogMeta = new CatalogMeta(catalogMeta);
updateCatalogMeta.getCatalogProperties().put("k2", "V2");
updateCatalogMeta.getCatalogProperties().put("k3", "v3");
- tableService().updateCatalog(updateCatalogMeta);
- CatalogMeta getCatalogMeta =
tableService().getCatalogMeta(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.updateCatalog(updateCatalogMeta);
+ CatalogMeta getCatalogMeta =
CATALOG_MANAGER.getCatalogMeta(catalogMeta.getCatalogName());
Assert.assertEquals("V2", getCatalogMeta.getCatalogProperties().get("k2"));
Assert.assertEquals("v3", getCatalogMeta.getCatalogProperties().get("k3"));
Assert.assertEquals(
- updateCatalogMeta,
tableService().getCatalogMeta(catalogMeta.getCatalogName()));
+ updateCatalogMeta,
CATALOG_MANAGER.getCatalogMeta(catalogMeta.getCatalogName()));
// test update catalog type
final CatalogMeta updateCatalogMeta2 = new CatalogMeta(updateCatalogMeta);
updateCatalogMeta2.setCatalogType(CatalogMetaProperties.CATALOG_TYPE_CUSTOM);
Assert.assertThrows(
- IllegalMetadataException.class, () ->
tableService().updateCatalog(updateCatalogMeta2));
+ IllegalMetadataException.class, () ->
CATALOG_MANAGER.updateCatalog(updateCatalogMeta2));
// test update unknown catalog
- tableService().dropCatalog(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.dropCatalog(catalogMeta.getCatalogName());
Assert.assertThrows(
- ObjectNotExistsException.class, () ->
tableService().updateCatalog(catalogMeta));
+ ObjectNotExistsException.class, () ->
CATALOG_MANAGER.updateCatalog(catalogMeta));
}
@Test
public void testDropCatalogWithDatabase() {
Assume.assumeTrue(catalogTestHelper.tableFormat().equals(TableFormat.MIXED_ICEBERG));
CatalogMeta catalogMeta = catalogTestHelper.buildCatalogMeta("/tmp");
- tableService().createCatalog(catalogMeta);
- InternalCatalog catalog =
tableService().getInternalCatalog(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.createCatalog(catalogMeta);
+ InternalCatalog catalog =
CATALOG_MANAGER.getInternalCatalog(catalogMeta.getCatalogName());
catalog.createDatabase("test_db");
Assert.assertThrows(
IllegalMetadataException.class,
- () -> tableService().dropCatalog(catalogMeta.getCatalogName()));
+ () -> CATALOG_MANAGER.dropCatalog(catalogMeta.getCatalogName()));
catalog.dropDatabase("test_db");
- tableService().dropCatalog(catalogMeta.getCatalogName());
+ CATALOG_MANAGER.dropCatalog(catalogMeta.getCatalogName());
}
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
index 885db19b4..aadac0f92 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java
@@ -56,7 +56,7 @@ public class TestDatabaseService extends AMSTableTestBase {
@Test
public void testCreateAndDropDatabase() {
- InternalCatalog catalog =
tableService().getInternalCatalog(TEST_CATALOG_NAME);
+ InternalCatalog catalog =
CATALOG_MANAGER.getInternalCatalog(TEST_CATALOG_NAME);
// test create database
catalog.createDatabase(TEST_DB_NAME);
@@ -78,7 +78,7 @@ public class TestDatabaseService extends AMSTableTestBase {
public void testDropDatabaseWithTable() {
Assume.assumeTrue(catalogTestHelper().tableFormat().equals(TableFormat.MIXED_ICEBERG));
Assume.assumeTrue(catalogTestHelper().isInternalCatalog());
- InternalCatalog catalog =
tableService().getInternalCatalog(TEST_CATALOG_NAME);
+ InternalCatalog catalog =
CATALOG_MANAGER.getInternalCatalog(TEST_CATALOG_NAME);
catalog.createDatabase(TEST_DB_NAME);
createTable();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java
index 641ac0e86..8329d8e7f 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java
@@ -67,7 +67,7 @@ public class TestTableRuntimeHandler extends AMSTableTestBase
{
@Test
public void testInitialize() throws Exception {
- tableService = new DefaultTableService(new Configurations());
+ tableService = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
TestHandler handler = new TestHandler();
tableService.addHandlerChain(handler);
tableService.initialize();
@@ -86,7 +86,7 @@ public class TestTableRuntimeHandler extends AMSTableTestBase
{
Assert.assertTrue(handler.isDisposed());
// initialize with a history table
- tableService = new DefaultTableService(new Configurations());
+ tableService = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
handler = new TestHandler();
tableService.addHandlerChain(handler);
tableService.initialize();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
index a1e7a59a0..b20e8f6f2 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java
@@ -74,7 +74,7 @@ public class TestTableService extends AMSTableTestBase {
@Test
public void testCreateAndDropTable() {
- ServerCatalog serverCatalog =
tableService().getServerCatalog(TEST_CATALOG_NAME);
+ ServerCatalog serverCatalog =
CATALOG_MANAGER.getServerCatalog(TEST_CATALOG_NAME);
InternalCatalog internalCatalog =
catalogTestHelper().isInternalCatalog() ? (InternalCatalog)
serverCatalog : null;
@@ -94,7 +94,7 @@ public class TestTableService extends AMSTableTestBase {
// test list tables
List<TableIDWithFormat> tableIdentifierList =
-
tableService().getServerCatalog(TEST_CATALOG_NAME).listTables(TEST_DB_NAME);
+
CATALOG_MANAGER.getServerCatalog(TEST_CATALOG_NAME).listTables(TEST_DB_NAME);
Assert.assertEquals(1, tableIdentifierList.size());
Assert.assertEquals(
tableMeta().getTableIdentifier(),
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index 721d990da..0e8c6f5cb 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -91,6 +91,9 @@ ams:
table-manifest-io:
thread-count: 20
+ catalog-meta-cache:
+ expiration-interval: 60s
+
database:
type: derby
jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver