This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit db4cb33eab06a1b2c602742122554137105a5bd5 Author: Mingyu Chen <[email protected]> AuthorDate: Mon Apr 15 22:25:35 2024 +0800 [refactor](refresh-catalog) refactor the refresh catalog code (#33653) To unify the code. In previous, we do catalog refresh in `CatalogMgr`, but do database and table refresh in `RefreshMgr`, which is very confusing. This PR move all `refresh` related code from CatalogMgr to RefreshMgr. No logic is changed in this PR. --- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../org/apache/doris/catalog/RefreshManager.java | 187 +++++++++++++++++--- .../apache/doris/common/util/PropertyAnalyzer.java | 2 +- .../org/apache/doris/datasource/CatalogMgr.java | 188 --------------------- .../apache/doris/datasource/ExternalObjectLog.java | 1 - .../datasource/hive/event/AlterPartitionEvent.java | 4 +- .../datasource/hive/event/AlterTableEvent.java | 6 +- .../doris/datasource/hive/event/InsertEvent.java | 6 +- .../hive/event/MetastoreEventsProcessor.java | 2 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 5 +- .../java/org/apache/doris/mysql/MysqlProto.java | 2 +- .../plans/commands/insert/HiveInsertExecutor.java | 4 +- .../java/org/apache/doris/persist/EditLog.java | 6 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 2 +- .../main/java/org/apache/doris/qe/DdlExecutor.java | 2 +- .../apache/doris/catalog/RefreshCatalogTest.java | 142 ---------------- .../doris/datasource/RefreshCatalogTest.java | 90 ++++++++++ 17 files changed, 277 insertions(+), 374 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 08a068f44c1..82f3c1a0b00 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5062,7 +5062,7 @@ public class Env { // Switch catalog of this sesseion. public void changeCatalog(ConnectContext ctx, String catalogName) throws DdlException { - CatalogIf catalogIf = catalogMgr.getCatalogNullable(catalogName); + CatalogIf catalogIf = catalogMgr.getCatalog(catalogName); if (catalogIf == null) { throw new DdlException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalogName), ErrorCode.ERR_UNKNOWN_CATALOG); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index ffa594f7b5a..39efce4c70f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -23,16 +23,23 @@ import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogFactory; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalObjectLog; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.persist.OperationType; import org.apache.doris.qe.DdlExecutor; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,53 +55,191 @@ public class RefreshManager { // the original refresh time and the current remaining time of the catalog private Map<Long, Integer[]> refreshMap = Maps.newConcurrentMap(); - public void handleRefreshTable(RefreshTableStmt stmt) throws UserException { - String catalogName = stmt.getCtl(); - String dbName = stmt.getDbName(); - String tableName = stmt.getTblName(); - Env env = Env.getCurrentEnv(); - - CatalogIf catalog = catalogName != null ? env.getCatalogMgr().getCatalog(catalogName) : env.getCurrentCatalog(); + // Refresh catalog + public void handleRefreshCatalog(RefreshCatalogStmt stmt) throws UserException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(stmt.getCatalogName()); + CatalogLog log = CatalogFactory.createCatalogLog(catalog.getId(), stmt); + refreshCatalogInternal(catalog, log.isInvalidCache()); + Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log); + } + public void replayRefreshCatalog(CatalogLog log) { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId()); if (catalog == null) { - throw new DdlException("Catalog " + catalogName + " doesn't exist."); + LOG.warn("failed to find catalog replaying refresh catalog {}", log.getCatalogId()); + return; } + refreshCatalogInternal(catalog, log.isInvalidCache()); + } - // Process external catalog table refresh - env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false); - LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName); + private void refreshCatalogInternal(CatalogIf catalog, boolean invalidCache) { + String catalogName = catalog.getName(); + if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { + ((ExternalCatalog) catalog).onRefresh(invalidCache); + LOG.info("refresh catalog {} with invalidCache {}", catalogName, invalidCache); + } } + // Refresh database public void handleRefreshDb(RefreshDbStmt stmt) throws DdlException { String catalogName = stmt.getCatalogName(); String dbName = stmt.getDbName(); Env env = Env.getCurrentEnv(); CatalogIf catalog = catalogName != null ? env.getCatalogMgr().getCatalog(catalogName) : env.getCurrentCatalog(); - if (catalog == null) { throw new DdlException("Catalog " + catalogName + " doesn't exist."); } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support refresh database in external catalog"); + } + DatabaseIf db = catalog.getDbOrDdlException(dbName); + ((ExternalDatabase) db).setUnInitialized(stmt.isInvalidCache()); + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setInvalidCache(stmt.isInvalidCache()); + Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log); + } + + public void replayRefreshDb(ExternalObjectLog log) { + refreshDbInternal(log.getCatalogId(), log.getDbId(), log.isInvalidCache()); + } + + private void refreshDbInternal(long catalogId, long dbId, boolean invalidCache) { + ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + ExternalDatabase db = catalog.getDbForReplay(dbId); + db.setUnInitialized(invalidCache); + LOG.info("refresh database {} in catalog {} with invalidCache {}", db.getFullName(), catalog.getName(), + invalidCache); + } - // Process external catalog db refresh - refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache()); - LOG.info("Successfully refresh db: {}", dbName); + // Refresh table + public void handleRefreshTable(RefreshTableStmt stmt) throws UserException { + String catalogName = stmt.getCtl(); + String dbName = stmt.getDbName(); + String tableName = stmt.getTblName(); + refreshTable(catalogName, dbName, tableName, false); } - private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean invalidCache) throws DdlException { + public void refreshTable(String catalogName, String dbName, String tableName, boolean ignoreIfNotExists) + throws DdlException { + Env env = Env.getCurrentEnv(); + CatalogIf catalog = catalogName != null ? env.getCatalogMgr().getCatalog(catalogName) : env.getCurrentCatalog(); + if (catalog == null) { + throw new DdlException("Catalog " + catalogName + " doesn't exist."); + } if (!(catalog instanceof ExternalCatalog)) { - throw new DdlException("Only support refresh ExternalCatalog Database"); + throw new DdlException("Only support refresh ExternalCatalog Tables"); } DatabaseIf db = catalog.getDbNullable(dbName); if (db == null) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + if (!ignoreIfNotExists) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + return; } - ((ExternalDatabase) db).setUnInitialized(invalidCache); + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + return; + } + refreshTableInternal(catalog, db, table, 0); + ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); - log.setInvalidCache(invalidCache); - Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log); + log.setTableId(table.getId()); + Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); + } + + public void replayRefreshTable(ExternalObjectLog log) { + ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId()); + if (catalog == null) { + LOG.warn("failed to find catalog replaying refresh table {}", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + TableIf table = db.getTableForReplay(log.getTableId()); + refreshTableInternal(catalog, db, table, log.getLastUpdateTime()); + } + + public void refreshExternalTableFromEvent(String catalogName, String dbName, String tableName, + long updateTime, boolean ignoreIfNotExists) throws DdlException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support refresh ExternalCatalog Tables"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + return; + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + return; + } + refreshTableInternal(catalog, db, table, updateTime); + } + + private void refreshTableInternal(CatalogIf catalog, DatabaseIf db, TableIf table, long updateTime) { + if (table instanceof ExternalTable) { + ((ExternalTable) table).unsetObjectCreated(); + } + Env.getCurrentEnv().getExtMetaCacheMgr() + .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); + if (table instanceof HMSExternalTable && updateTime > 0) { + ((HMSExternalTable) table).setEventUpdateTime(updateTime); + } + LOG.info("refresh table {} from db {} in catalog {}", table.getName(), db.getFullName(), catalog.getName()); + } + + // Refresh partition + public void refreshPartitions(String catalogName, String dbName, String tableName, + List<String> partitionNames, long updateTime, boolean ignoreIfNotExists) + throws DdlException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalog == null) { + if (!ignoreIfNotExists) { + throw new DdlException("No catalog found with name: " + catalogName); + } + return; + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + return; + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + if (!ignoreIfNotExists) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + return; + } + + Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache( + catalog.getId(), db.getFullName(), table.getName(), partitionNames); + ((HMSExternalTable) table).setEventUpdateTime(updateTime); } public void addToRefreshMap(long catalogId, Integer[] sec) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 0a943441b68..e411e992e97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1386,7 +1386,7 @@ public class PropertyAnalyzer { || properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM))) { return properties; } - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogNullable(ctl); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(ctl); if (catalog == null) { return properties; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index c0d2ca77853..56b390bc7d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.AlterCatalogNameStmt; import org.apache.doris.analysis.AlterCatalogPropertyStmt; import org.apache.doris.analysis.CreateCatalogStmt; import org.apache.doris.analysis.DropCatalogStmt; -import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.ShowCatalogStmt; import org.apache.doris.analysis.ShowCreateCatalogStmt; import org.apache.doris.catalog.DatabaseIf; @@ -152,16 +151,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return catalog; } - private void unprotectedRefreshCatalog(long catalogId, boolean invalidCache) { - CatalogIf catalog = idToCatalog.get(catalogId); - if (catalog != null) { - String catalogName = catalog.getName(); - if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { - ((ExternalCatalog) catalog).onRefresh(invalidCache); - } - } - } - public InternalCatalog getInternalCatalog() { return internalCatalog; } @@ -366,13 +355,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } - /** - * Get catalog, or null if not exists. - */ - public CatalogIf getCatalogNullable(String catalogName) { - return nameToCatalog.get(catalogName); - } - /** * List all catalog or get the special catalog with a name. */ @@ -481,28 +463,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return new ShowResultSet(showStmt.getMetaData(), rows); } - /** - * Refresh the catalog meta and write the meta log. - */ - public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException { - CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName()); - if (catalog == null) { - throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); - } - CatalogLog log = CatalogFactory.createCatalogLog(catalog.getId(), stmt); - refreshCatalog(log); - } - - public void refreshCatalog(CatalogLog log) { - writeLock(); - try { - replayRefreshCatalog(log); - Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log); - } finally { - writeUnlock(); - } - } - /** * Reply for create catalog event. */ @@ -543,18 +503,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } - /** - * Reply for refresh catalog event. - */ - public void replayRefreshCatalog(CatalogLog log) { - writeLock(); - try { - unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache()); - } finally { - writeUnlock(); - } - } - /** * Reply for alter catalog name event. */ @@ -646,108 +594,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { db.replayInitDb(log, catalog); } - public void replayRefreshExternalDb(ExternalObjectLog log) { - writeLock(); - try { - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - db.setUnInitialized(log.isInvalidCache()); - } finally { - writeUnlock(); - } - } - - public void refreshExternalTableFromEvent(String dbName, String tableName, String catalogName, - long updateTime, boolean ignoreIfNotExists) throws DdlException { - CatalogIf catalog = nameToCatalog.get(catalogName); - if (catalog == null) { - throw new DdlException("No catalog found with name: " + catalogName); - } - if (!(catalog instanceof ExternalCatalog)) { - throw new DdlException("Only support refresh ExternalCatalog Tables"); - } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); - } - return; - } - - TableIf table = db.getTableNullable(tableName); - if (table == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Table " + tableName + " does not exist in db " + dbName); - } - return; - } - if (!(table instanceof HMSExternalTable)) { - return; - } - ((HMSExternalTable) table).unsetObjectCreated(); - ((HMSExternalTable) table).setEventUpdateTime(updateTime); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); - } - - public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists) - throws DdlException { - CatalogIf catalog = nameToCatalog.get(catalogName); - if (catalog == null) { - throw new DdlException("No catalog found with name: " + catalogName); - } - if (!(catalog instanceof ExternalCatalog)) { - throw new DdlException("Only support refresh ExternalCatalog Tables"); - } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); - } - return; - } - - TableIf table = db.getTableNullable(tableName); - if (table == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Table " + tableName + " does not exist in db " + dbName); - } - return; - } - if (table instanceof ExternalTable) { - ((ExternalTable) table).unsetObjectCreated(); - } - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); - } - - public void replayRefreshExternalTable(ExternalObjectLog log) { - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - table.unsetObjectCreated(); - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); - if (table instanceof HMSExternalTable && log.getLastUpdateTime() > 0) { - ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime()); - } - } - public void unregisterExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists) throws DdlException { CatalogIf<?> catalog = nameToCatalog.get(catalogName); @@ -944,40 +790,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { hmsTable.setEventUpdateTime(updateTime); } - public void refreshExternalPartitions(String catalogName, String dbName, String tableName, - List<String> partitionNames, long updateTime, boolean ignoreIfNotExists) - throws DdlException { - CatalogIf catalog = nameToCatalog.get(catalogName); - if (catalog == null) { - if (!ignoreIfNotExists) { - throw new DdlException("No catalog found with name: " + catalogName); - } - return; - } - if (!(catalog instanceof ExternalCatalog)) { - throw new DdlException("Only support ExternalCatalog"); - } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); - } - return; - } - - TableIf table = db.getTableNullable(tableName); - if (table == null) { - if (!ignoreIfNotExists) { - throw new DdlException("Table " + tableName + " does not exist in db " + dbName); - } - return; - } - - Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache( - catalog.getId(), db.getFullName(), table.getName(), partitionNames); - ((HMSExternalTable) table).setEventUpdateTime(updateTime); - } - public void registerCatalogRefreshListener(Env env) { for (CatalogIf catalog : idToCatalog.values()) { Map<String, String> properties = catalog.getProperties(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index 4ebb137a607..7729ede264a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -68,5 +68,4 @@ public class ExternalObjectLog implements Writable { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ExternalObjectLog.class); } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 4085901de9a..6be0215f143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -119,8 +119,8 @@ public class AlterPartitionEvent extends MetastorePartitionEvent { .addExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameAfter), eventTime, true); } else { - Env.getCurrentEnv().getCatalogMgr() - .refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), + Env.getCurrentEnv().getRefreshManager() + .refreshPartitions(catalogName, dbName, hmsTbl.getTableName(), Lists.newArrayList(partitionNameAfter), eventTime, true); } } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index 1567960b7f5..78c4eac888d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -156,9 +156,9 @@ public class AlterTableEvent extends MetastoreTableEvent { return; } //The scope of refresh can be narrowed in the future - Env.getCurrentEnv().getCatalogMgr() - .refreshExternalTableFromEvent(tableBefore.getDbName(), tableBefore.getTableName(), - catalogName, eventTime, true); + Env.getCurrentEnv().getRefreshManager() + .refreshExternalTableFromEvent(catalogName, tableBefore.getDbName(), tableBefore.getTableName(), + eventTime, true); } catch (Exception e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index 1540ca5ea3e..fcecbabfcc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -79,12 +79,12 @@ public class InsertEvent extends MetastoreTableEvent { * Only when we use hive client to execute a `INSERT INTO TBL SELECT * ...` or `INSERT INTO TBL ...` sql * to a non-partitioned table then the hms will generate an insert event, and there is not * any partition event occurs, but the file cache may has been changed, so we need handle this. - * Currently {@link org.apache.doris.datasource.CatalogMgr#refreshExternalTable} do not invalidate + * Currently {@link org.apache.doris.catalog.RefreshManager#refreshTable()} do not invalidate * the file cache of this table, * but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it. */ - Env.getCurrentEnv().getCatalogMgr().refreshExternalTableFromEvent(dbName, tblName, - catalogName, eventTime, true); + Env.getCurrentEnv().getRefreshManager().refreshExternalTableFromEvent(catalogName, dbName, tblName, + eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 5ca94866bb4..6e12c35e2b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -308,7 +308,7 @@ public class MetastoreEventsProcessor extends MasterDaemon { CatalogLog log = new CatalogLog(); log.setCatalogId(hmsExternalCatalog.getId()); log.setInvalidCache(true); - Env.getCurrentEnv().getCatalogMgr().replayRefreshCatalog(log); + Env.getCurrentEnv().getRefreshManager().replayRefreshCatalog(log); } private void refreshCatalogForSlave(HMSExternalCatalog hmsExternalCatalog) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 979f5535a22..42560575b14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -276,9 +276,8 @@ public class MTMVTask extends AbstractTask { TableIf tableIf = MTMVUtil.getTable(tableInfo); if (tableIf instanceof HMSExternalTable) { HMSExternalTable hmsTable = (HMSExternalTable) tableIf; - Env.getCurrentEnv().getCatalogMgr() - .refreshExternalTable(hmsTable.getDbName(), hmsTable.getName(), hmsTable.getCatalog().getName(), - true); + Env.getCurrentEnv().getRefreshManager() + .refreshTable(hmsTable.getCatalog().getName(), hmsTable.getDbName(), hmsTable.getName(), true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java index e12ed336f41..babc19b33bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java @@ -328,7 +328,7 @@ public class MysqlProto { // check catalog and db exists if (catalogName != null) { - CatalogIf catalogIf = context.getEnv().getCatalogMgr().getCatalogNullable(catalogName); + CatalogIf catalogIf = context.getEnv().getCatalogMgr().getCatalog(catalogName); if (catalogIf == null) { context.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match catalog in doris: " + db); return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 116a04215d8..16236c340a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -113,10 +113,10 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { transactionManager.commit(txnId); summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); txnStatus = TransactionStatus.COMMITTED; - Env.getCurrentEnv().getCatalogMgr().refreshExternalTable( + Env.getCurrentEnv().getRefreshManager().refreshTable( + catalogName, dbName, tbName, - catalogName, true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index ffae17c6b1c..e7491e9d478 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -948,7 +948,7 @@ public class EditLog { } case OperationType.OP_REFRESH_CATALOG: { CatalogLog log = (CatalogLog) journal.getData(); - env.getCatalogMgr().replayRefreshCatalog(log); + env.getRefreshManager().replayRefreshCatalog(log); break; } case OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE: { @@ -1014,7 +1014,7 @@ public class EditLog { } case OperationType.OP_REFRESH_EXTERNAL_DB: { final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayRefreshExternalDb(log); + env.getRefreshManager().replayRefreshDb(log); break; } case OperationType.OP_INIT_EXTERNAL_DB: { @@ -1024,7 +1024,7 @@ public class EditLog { } case OperationType.OP_REFRESH_EXTERNAL_TABLE: { final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayRefreshExternalTable(log); + env.getRefreshManager().replayRefreshTable(log); break; } case OperationType.OP_DROP_EXTERNAL_TABLE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index a6d6c42b3e3..34beb3c8115 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -124,7 +124,7 @@ public abstract class ConnectProcessor { // check catalog and db exists if (catalogName != null) { - CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalogNullable(catalogName); + CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalog(catalogName); if (catalogIf == null) { ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match catalog in doris: " + fullDbName); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 35632c5e430..6f2d5e01244 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -367,7 +367,7 @@ public class DdlExecutor { } else if (ddlStmt instanceof DropMaterializedViewStmt) { env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof RefreshCatalogStmt) { - env.getCatalogMgr().refreshCatalog((RefreshCatalogStmt) ddlStmt); + env.getRefreshManager().handleRefreshCatalog((RefreshCatalogStmt) ddlStmt); } else if (ddlStmt instanceof RefreshLdapStmt) { env.getAuth().refreshLdap((RefreshLdapStmt) ddlStmt); } else if (ddlStmt instanceof AlterUserStmt) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshCatalogTest.java deleted file mode 100644 index a1a935f5f04..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RefreshCatalogTest.java +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.catalog; - -import org.apache.doris.analysis.CreateCatalogStmt; -import org.apache.doris.analysis.DropCatalogStmt; -import org.apache.doris.analysis.RefreshCatalogStmt; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.test.TestExternalCatalog; -import org.apache.doris.datasource.test.TestExternalTable; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.DdlExecutor; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.Map; - -public class RefreshCatalogTest extends TestWithFeService { - private static Env env; - private ConnectContext rootCtx; - - @Override - protected void runBeforeAll() throws Exception { - FeConstants.runningUnitTest = true; - rootCtx = createDefaultCtx(); - env = Env.getCurrentEnv(); - // 1. create test catalog - CreateCatalogStmt testCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt("create catalog test1 properties(\n" - + " \"type\" = \"test\",\n" - + " \"catalog_provider.class\" " - + "= \"org.apache.doris.catalog.RefreshTableTest$RefreshTableProvider\"\n" - + ");", - rootCtx); - env.getCatalogMgr().createCatalog(testCatalog); - } - - @Override - protected void runAfterAll() throws Exception { - super.runAfterAll(); - rootCtx.setThreadLocalInfo(); - DropCatalogStmt stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test1"); - env.getCatalogMgr().dropCatalog(stmt); - } - - @Test - public void testRefreshCatalog() throws Exception { - CatalogIf test1 = env.getCatalogMgr().getCatalog("test1"); - // init is 0 - long l1 = test1.getLastUpdateTime(); - Assertions.assertTrue(l1 == 0); - TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get(); - // getDb() triggered init method - long l2 = test1.getLastUpdateTime(); - Assertions.assertTrue(l2 > l1); - Assertions.assertFalse(table.isObjectCreated()); - table.makeSureInitialized(); - Assertions.assertTrue(table.isObjectCreated()); - RefreshCatalogStmt refreshCatalogStmt = new RefreshCatalogStmt("test1", null); - Assertions.assertTrue(refreshCatalogStmt.isInvalidCache()); - try { - DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); - } catch (Exception e) { - // Do nothing - } - // not triggered init method - long l3 = test1.getLastUpdateTime(); - Assertions.assertTrue(l3 == l2); - Assertions.assertTrue(table.isObjectCreated()); - test1.getDbNullable("db1").getTables(); - Assertions.assertFalse(table.isObjectCreated()); - try { - DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); - } catch (Exception e) { - // Do nothing - } - Assertions.assertFalse(((ExternalCatalog) test1).isInitialized()); - table.makeSureInitialized(); - Assertions.assertTrue(((ExternalCatalog) test1).isInitialized()); - // table.makeSureInitialized() triggered init method - long l4 = test1.getLastUpdateTime(); - Assertions.assertTrue(l4 > l3); - try { - DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); - } catch (Exception e) { - // Do nothing - } - Assertions.assertFalse(((ExternalCatalog) test1).isInitialized()); - } - - public static class RefreshTableProvider implements TestExternalCatalog.TestCatalogProvider { - public static final Map<String, Map<String, List<Column>>> MOCKED_META; - - static { - MOCKED_META = Maps.newHashMap(); - Map<String, List<Column>> tblSchemaMap1 = Maps.newHashMap(); - // db1 - tblSchemaMap1.put("tbl11", Lists.newArrayList( - new Column("a11", PrimitiveType.BIGINT), - new Column("a12", PrimitiveType.STRING), - new Column("a13", PrimitiveType.FLOAT))); - tblSchemaMap1.put("tbl12", Lists.newArrayList( - new Column("b21", PrimitiveType.BIGINT), - new Column("b22", PrimitiveType.STRING), - new Column("b23", PrimitiveType.FLOAT))); - MOCKED_META.put("db1", tblSchemaMap1); - // db2 - Map<String, List<Column>> tblSchemaMap2 = Maps.newHashMap(); - tblSchemaMap2.put("tbl21", Lists.newArrayList( - new Column("c11", PrimitiveType.BIGINT), - new Column("c12", PrimitiveType.STRING), - new Column("c13", PrimitiveType.FLOAT))); - MOCKED_META.put("db2", tblSchemaMap2); - } - - @Override - public Map<String, Map<String, List<Column>>> getMetadata() { - return MOCKED_META; - } - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 0cdd0db8ae3..26c1f5d7664 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource; import org.apache.doris.analysis.CreateCatalogStmt; import org.apache.doris.analysis.DropCatalogStmt; +import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InfoSchemaDb; @@ -29,9 +30,11 @@ import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; import org.apache.doris.datasource.test.TestExternalCatalog; import org.apache.doris.datasource.test.TestExternalDatabase; +import org.apache.doris.datasource.test.TestExternalTable; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.DdlExecutor; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.Lists; @@ -65,6 +68,15 @@ public class RefreshCatalogTest extends TestWithFeService { + ");", rootCtx); env.getCatalogMgr().createCatalog(testCatalog); + + // 2. create test2 catalog + testCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt("create catalog test2 properties(\n" + + " \"type\" = \"test\",\n" + + " \"catalog_provider.class\" " + + "= \"org.apache.doris.datasource.RefreshCatalogTest$RefreshCatalogProvider2\"\n" + + ");", + rootCtx); + env.getCatalogMgr().createCatalog(testCatalog); } @Override @@ -73,6 +85,8 @@ public class RefreshCatalogTest extends TestWithFeService { rootCtx.setThreadLocalInfo(); DropCatalogStmt stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test1"); env.getCatalogMgr().dropCatalog(stmt); + stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test2"); + env.getCatalogMgr().dropCatalog(stmt); } @Test @@ -107,6 +121,51 @@ public class RefreshCatalogTest extends TestWithFeService { Assertions.assertEquals(2, mysqlDb.getTables().size()); } + @Test + public void testRefreshCatalogLastUpdateTime() throws Exception { + CatalogIf test2 = env.getCatalogMgr().getCatalog("test2"); + // init is 0 + long l1 = test2.getLastUpdateTime(); + Assertions.assertTrue(l1 == 0); + TestExternalTable table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); + // getDb() triggered init method + long l2 = test2.getLastUpdateTime(); + Assertions.assertTrue(l2 > l1); + Assertions.assertFalse(table.isObjectCreated()); + table.makeSureInitialized(); + Assertions.assertTrue(table.isObjectCreated()); + RefreshCatalogStmt refreshCatalogStmt = new RefreshCatalogStmt("test2", null); + Assertions.assertTrue(refreshCatalogStmt.isInvalidCache()); + try { + DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); + } catch (Exception e) { + // Do nothing + } + // not triggered init method + long l3 = test2.getLastUpdateTime(); + Assertions.assertTrue(l3 == l2); + Assertions.assertTrue(table.isObjectCreated()); + test2.getDbNullable("db1").getTables(); + Assertions.assertFalse(table.isObjectCreated()); + try { + DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); + } catch (Exception e) { + // Do nothing + } + Assertions.assertFalse(((ExternalCatalog) test2).isInitialized()); + table.makeSureInitialized(); + Assertions.assertTrue(((ExternalCatalog) test2).isInitialized()); + // table.makeSureInitialized() triggered init method + long l4 = test2.getLastUpdateTime(); + Assertions.assertTrue(l4 > l3); + try { + DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); + } catch (Exception e) { + // Do nothing + } + Assertions.assertFalse(((ExternalCatalog) test2).isInitialized()); + } + public static class RefreshCatalogProvider implements TestExternalCatalog.TestCatalogProvider { public static final Map<String, Map<String, List<Column>>> MOCKED_META; @@ -147,4 +206,35 @@ public class RefreshCatalogTest extends TestWithFeService { MOCKED_META.put("db3", tblSchemaMap3); } } + + public static class RefreshCatalogProvider2 implements TestExternalCatalog.TestCatalogProvider { + public static final Map<String, Map<String, List<Column>>> MOCKED_META; + + static { + MOCKED_META = Maps.newHashMap(); + Map<String, List<Column>> tblSchemaMap1 = Maps.newHashMap(); + // db1 + tblSchemaMap1.put("tbl11", Lists.newArrayList( + new Column("a11", PrimitiveType.BIGINT), + new Column("a12", PrimitiveType.STRING), + new Column("a13", PrimitiveType.FLOAT))); + tblSchemaMap1.put("tbl12", Lists.newArrayList( + new Column("b21", PrimitiveType.BIGINT), + new Column("b22", PrimitiveType.STRING), + new Column("b23", PrimitiveType.FLOAT))); + MOCKED_META.put("db1", tblSchemaMap1); + // db2 + Map<String, List<Column>> tblSchemaMap2 = Maps.newHashMap(); + tblSchemaMap2.put("tbl21", Lists.newArrayList( + new Column("c11", PrimitiveType.BIGINT), + new Column("c12", PrimitiveType.STRING), + new Column("c13", PrimitiveType.FLOAT))); + MOCKED_META.put("db2", tblSchemaMap2); + } + + @Override + public Map<String, Map<String, List<Column>>> getMetadata() { + return MOCKED_META; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
