github-actions[bot] commented on code in PR #60937:
URL: https://github.com/apache/doris/pull/60937#discussion_r2887168889


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java:
##########
@@ -162,184 +165,341 @@ public ExecutorService getScheduleExecutor() {
         return scheduleExecutor;
     }
 
-    public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
-        return hiveMetaStoreCacheMgr.getCache(catalog);
+    public ExternalMetaCache engine(String engine) {
+        Objects.requireNonNull(engine, "engine is null");
+        String normalizedEngine = normalizeEngineName(engine);
+        ExternalMetaCache found = engineCaches.get(normalizedEngine);
+        if (found != null) {
+            return found;
+        }
+        throw new IllegalArgumentException(
+                String.format("unsupported external meta cache engine '%s'", 
normalizedEngine));
     }
 
-    public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
-        return schemaCacheMgr.getCache(catalog);
+    public void prepareCatalog(long catalogId) {
+        Map<String, String> catalogProperties = 
getCatalogProperties(catalogId);
+        routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId, 
catalogProperties));
     }
 
-    public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog 
catalog) {
-        return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
+    public void prepareCatalogByEngine(long catalogId, String engine) {
+        prepareCatalogByEngine(catalogId, engine, 
getCatalogProperties(catalogId));
     }
 
-    public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog 
catalog) {
-        return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
+    public void prepareCatalogByEngine(long catalogId, String engine, 
Map<String, String> catalogProperties) {
+        Map<String, String> safeCatalogProperties = catalogProperties == null
+                ? Maps.newHashMap()
+                : Maps.newHashMap(catalogProperties);
+        routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId, 
safeCatalogProperties));
     }
 
-    public HudiCachedMetaClientProcessor 
getMetaClientProcessor(ExternalCatalog catalog) {
-        return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
+    public void invalidateCatalog(long catalogId) {
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateCatalog",
+                () -> cache.invalidateCatalogEntries(catalogId)));
     }
 
-    public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
-        return hudiMetadataCacheMgr;
+    public void invalidateCatalogEntries(long catalogId) {
+        invalidateCatalog(catalogId);
     }
 
-    public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog 
catalog) {
-        return icebergMetadataCacheMgr.getCache(catalog);
+    public void invalidateCatalogByEngine(long catalogId, String engine) {
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "invalidateCatalogByEngine",
+                () -> cache.invalidateCatalogEntries(catalogId)));
     }
 
-    public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) 
{
-        return paimonMetadataCacheMgr.getCache(catalog);
+    public void removeCatalog(long catalogId) {
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "removeCatalog",
+                () -> cache.invalidateCatalog(catalogId)));
     }
 
-    public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
-        return 
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
+    public void removeCatalogByEngine(long catalogId, String engine) {
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "removeCatalogByEngine",
+                () -> cache.invalidateCatalog(catalogId)));
     }
 
-    public FileSystemCache getFsCache() {
-        return fsCache;
+    public void invalidateDb(long catalogId, String dbName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateDb", () -> 
cache.invalidateDb(catalogId, normalizedDbName)));
     }
 
-    public ExternalRowCountCache getRowCountCache() {
-        return rowCountCache;
+    public void invalidateTable(long catalogId, String dbName, String 
tableName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateTable",
+                () -> cache.invalidateTable(catalogId, normalizedDbName, 
tableName)));
+    }
+
+    public void invalidateTableByEngine(long catalogId, String engine, String 
dbName, String tableName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "invalidateTableByEngine",
+                () -> cache.invalidateTable(catalogId, normalizedDbName, 
tableName)));
+    }
+
+    public void invalidatePartitions(long catalogId,
+            String dbName, String tableName, List<String> partitions) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidatePartitions",
+                () -> cache.invalidatePartitions(catalogId, normalizedDbName, 
tableName, partitions)));
     }
 
-    public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
-        return dorisExternalMetaCacheMgr;
+    public Map<String, Map<String, String>> getCatalogCacheStats(long 
catalogId) {
+        Map<String, Map<String, String>> stats = Maps.newHashMap();
+        engineCaches.forEach((engineName, externalMetaCache) -> 
externalMetaCache.stats(catalogId)
+                .forEach((entryName, entryStats) -> stats.put(engineName + "." 
+ entryName, entryStats)));
+        return stats;
     }
 
-    public void removeCache(long catalogId) {
-        if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove hive metastore cache for catalog {}", catalogId);
+    private void initEngineCaches() {
+        registerBuiltinEngineCaches();
+        loadSpiEngineCaches();
+    }
+
+    private void registerBuiltinEngineCaches() {
+        registerEngineCache(new DefaultExternalMetaCache(ENGINE_DEFAULT, 
commonRefreshExecutor), "builtin");
+        registerEngineCache(new HiveExternalMetaCache(commonRefreshExecutor, 
fileListingExecutor), "builtin");
+        registerEngineCache(new HudiExternalMetaCache(commonRefreshExecutor), 
"builtin");
+        registerEngineCache(new 
IcebergExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new 
PaimonExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new 
MaxComputeExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new DorisExternalMetaCache(commonRefreshExecutor), 
"builtin");
+    }
+
+    private void loadSpiEngineCaches() {
+        
loadSpiEngineCaches(ServiceLoader.load(ExternalMetaCacheFactory.class), 
"classpath");
+        try {
+            
loadSpiEngineCaches(ClassLoaderUtils.loadServicesFromDirectory(ExternalMetaCacheFactory.class),
+                    Config.external_meta_cache_plugins_dir);
+        } catch (IOException e) {
+            LOG.warn("failed to load external meta cache SPI factories from 
directory {}",
+                    Config.external_meta_cache_plugins_dir, e);
         }
-        if (schemaCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove schema cache for catalog {}", catalogId);
+    }
+
+    private void loadSpiEngineCaches(Iterable<ExternalMetaCacheFactory> 
factories, String source) {
+        for (ExternalMetaCacheFactory factory : factories) {
+            registerSpiFactory(factory, source);
         }
-        if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove iceberg meta cache for catalog {}", catalogId);
+    }
+
+    private void registerSpiFactory(ExternalMetaCacheFactory factory, String 
source) {
+        String engineName = normalizeEngineName(factory.engine());
+        ExternalMetaCache cache;
+        try {
+            cache = factory.create(factoryContext);
+        } catch (Exception e) {
+            LOG.warn("failed to initialize external meta cache SPI factory {} 
for engine {}",
+                    factory.getClass().getName(), engineName, e);
+            return;
         }
-        hudiMetadataCacheMgr.removeCache(catalogId);
-        maxComputeMetadataCacheMgr.removeCache(catalogId);
-        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.removeCache(catalogId);
-        if (paimonMetadataCache != null) {
-            paimonMetadataCache.invalidateCatalogCache(catalogId);
+        if (cache == null) {
+            LOG.warn("external meta cache SPI factory {} returns null cache 
for engine {}",
+                    factory.getClass().getName(), engineName);
+            return;
         }
-        dorisExternalMetaCacheMgr.removeCache(catalogId);
+        registerEngineCache(cache, "spi(" + source + ")");
     }
 
-    public void invalidateTableCache(ExternalTable dorisTable) {
-        ExternalSchemaCache schemaCache = 
schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (schemaCache != null) {
-            schemaCache.invalidateTableCache(dorisTable);
+    private void registerEngineCache(ExternalMetaCache cache, String source) {
+        String engineName = normalizeEngineName(cache.engine());
+        ExternalMetaCache existing = engineCaches.putIfAbsent(engineName, 
cache);
+        if (existing != null) {
+            LOG.warn("skip duplicated external meta cache engine '{}' from {}, 
existing class: {}, new class: {}",
+                    engineName, source, existing.getClass().getName(), 
cache.getClass().getName());
+            return;
         }
-        HiveMetaStoreCache hiveMetaCache = 
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (hiveMetaCache != null) {
-            
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+        if ("builtin".equals(source)) {
+            LOG.debug("registered external meta cache engine '{}' from {}", 
engineName, source);
+        } else {
+            LOG.info("registered external meta cache engine '{}' from {}", 
engineName, source);
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (icebergMetadataCache != null) {
-            icebergMetadataCache.invalidateTableCache(dorisTable);
+    }
+
+    private static String normalizeEngineName(String engine) {
+        if (engine == null) {
+            return ENGINE_DEFAULT;
         }
-        hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
-        maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
-        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (paimonMetadataCache != null) {
-            paimonMetadataCache.invalidateTableCache(dorisTable);
+        String normalized = engine.trim().toLowerCase(Locale.ROOT);
+        if (normalized.isEmpty()) {
+            return ENGINE_DEFAULT;
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("invalid table cache for {}.{} in catalog {}", 
dorisTable.getRemoteDbName(),
-                    dorisTable.getRemoteName(), 
dorisTable.getCatalog().getName());
+        switch (normalized) {
+            case "hms":
+                return ENGINE_HIVE;
+            case "external_doris":
+                return ENGINE_DORIS;
+            case "max_compute":
+                return ENGINE_MAXCOMPUTE;
+            default:
+                return normalized;
         }
     }
 
-    public void invalidateDbCache(long catalogId, String dbName) {
-        dbName = ClusterNamespace.getNameFromFullName(dbName);
-        ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
-        if (schemaCache != null) {
-            schemaCache.invalidateDbCache(dbName);
+    private void routeCatalogEngines(long catalogId, 
Consumer<ExternalMetaCache> action) {
+        engineCaches.values().forEach(action);
+    }
+
+    private void routeSpecifiedEngine(String engine, 
Consumer<ExternalMetaCache> action) {
+        action.accept(this.engine(engine));
+    }
+
+    private void safeInvalidate(ExternalMetaCache cache, long catalogId, 
String operation, Runnable action) {
+        try {
+            cache.checkCatalogInitialized(catalogId);
+            action.run();
+        } catch (IllegalStateException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip {} for catalog {} on engine '{}' because cache 
entry is absent",
+                        operation, catalogId, cache.engine());
+            }
+        }
+    }
+
+    private static class FactoryContext implements 
ExternalMetaCacheFactory.Context {
+        private final ExecutorService refreshExecutor;
+        private final ExecutorService fileListingExecutor;
+
+        private FactoryContext(ExecutorService refreshExecutor, 
ExecutorService fileListingExecutor) {
+            this.refreshExecutor = refreshExecutor;
+            this.fileListingExecutor = fileListingExecutor;
         }
-        HiveMetaStoreCache metaCache = 
hiveMetaStoreCacheMgr.getCache(catalogId);
-        if (metaCache != null) {
-            metaCache.invalidateDbCache(dbName);
+
+        @Override
+        public ExecutorService refreshExecutor() {
+            return refreshExecutor;
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(catalogId);
-        if (icebergMetadataCache != null) {
-            icebergMetadataCache.invalidateDbCache(catalogId, dbName);
+
+        @Override
+        public ExecutorService fileListingExecutor() {
+            return fileListingExecutor;
         }
-        hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
-        maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
-        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(catalogId);
-        if (paimonMetadataCache != null) {
-            paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+    }
+
+    private Map<String, String> getCatalogProperties(long catalogId) {
+        CatalogIf<?> catalog = getCatalog(catalogId);
+        if (catalog == null || catalog.getProperties() == null) {
+            return Maps.newHashMap();
         }
+        return Maps.newHashMap(catalog.getProperties());
+    }
+
+    @Nullable
+    private CatalogIf<?> getCatalog(long catalogId) {
+        if (Env.getCurrentEnv() == null || Env.getCurrentEnv().getCatalogMgr() 
== null) {
+            return null;
+        }
+        return Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+    }
+
+    public HiveExternalMetaCache getMetaStoreCache() {
+        return getHiveExternalMetaCache();
+    }
+
+    @SuppressWarnings("unchecked")
+    public Optional<SchemaCacheValue> getSchemaCacheValue(ExternalTable table, 
SchemaCacheKey key) {
+        long catalogId = table.getCatalog().getId();
+        Class<SchemaCacheKey> keyType = (Class<SchemaCacheKey>) key.getClass();

Review Comment:
   **Bug: Type mismatch causes `IllegalArgumentException` at runtime**
   
   `key.getClass()` returns the runtime class (e.g., 
`IcebergSchemaCacheKey.class`), but the unchecked cast to 
`Class<SchemaCacheKey>` does nothing at runtime due to type erasure. This 
`keyType` is then passed to `AbstractExternalMetaCache.entry()`, which calls 
`ensureTypeCompatible()` using strict `Class.equals()` comparison.
   
   For Iceberg, the schema entry is registered with 
`IcebergSchemaCacheKey.class` (in `IcebergExternalMetaCache.java:103`), so when 
this method is called with a plain `SchemaCacheKey`, `ensureTypeCompatible` 
will throw:
   ```
   Entry 'schema' for engine 'iceberg' expects key/value types
   (IcebergSchemaCacheKey, SchemaCacheValue), but got (SchemaCacheKey, 
SchemaCacheValue)
   ```
   
   The same issue affects any engine using a `SchemaCacheKey` subclass (Hudi's 
`HudiSchemaCacheKey`, Paimon's `PaimonSchemaCacheKey`, etc.).
   
   **Currently mitigated** because `IcebergExternalTable.getFullSchema()` is 
overridden to use the engine-specific path 
(`IcebergUtils.getSchemaCacheValue()` → 
`IcebergExternalMetaCache.getIcebergSchemaCacheValue()`), bypassing this 
method. But `getSchemaCacheValue()` itself is public and not overridden on 
`IcebergExternalTable`, so any caller using the base class path would fail.
   
   **Suggested fix**: Either:
   1. Use `isAssignableFrom()` instead of `equals()` in 
`ensureTypeCompatible()`, or
   2. Change `key.getClass()` here to use the registered entry def's key type 
directly, or
   3. Override `getSchemaCacheValue()` in each engine-specific table class



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java:
##########
@@ -162,184 +165,341 @@ public ExecutorService getScheduleExecutor() {
         return scheduleExecutor;
     }
 
-    public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
-        return hiveMetaStoreCacheMgr.getCache(catalog);
+    public ExternalMetaCache engine(String engine) {
+        Objects.requireNonNull(engine, "engine is null");
+        String normalizedEngine = normalizeEngineName(engine);
+        ExternalMetaCache found = engineCaches.get(normalizedEngine);
+        if (found != null) {
+            return found;
+        }
+        throw new IllegalArgumentException(
+                String.format("unsupported external meta cache engine '%s'", 
normalizedEngine));
     }
 
-    public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
-        return schemaCacheMgr.getCache(catalog);
+    public void prepareCatalog(long catalogId) {
+        Map<String, String> catalogProperties = 
getCatalogProperties(catalogId);
+        routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId, 
catalogProperties));
     }
 
-    public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog 
catalog) {
-        return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
+    public void prepareCatalogByEngine(long catalogId, String engine) {
+        prepareCatalogByEngine(catalogId, engine, 
getCatalogProperties(catalogId));
     }
 
-    public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog 
catalog) {
-        return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
+    public void prepareCatalogByEngine(long catalogId, String engine, 
Map<String, String> catalogProperties) {
+        Map<String, String> safeCatalogProperties = catalogProperties == null
+                ? Maps.newHashMap()
+                : Maps.newHashMap(catalogProperties);
+        routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId, 
safeCatalogProperties));
     }
 
-    public HudiCachedMetaClientProcessor 
getMetaClientProcessor(ExternalCatalog catalog) {
-        return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
+    public void invalidateCatalog(long catalogId) {
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateCatalog",
+                () -> cache.invalidateCatalogEntries(catalogId)));
     }
 
-    public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
-        return hudiMetadataCacheMgr;
+    public void invalidateCatalogEntries(long catalogId) {
+        invalidateCatalog(catalogId);
     }
 
-    public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog 
catalog) {
-        return icebergMetadataCacheMgr.getCache(catalog);
+    public void invalidateCatalogByEngine(long catalogId, String engine) {
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "invalidateCatalogByEngine",
+                () -> cache.invalidateCatalogEntries(catalogId)));
     }
 
-    public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog) 
{
-        return paimonMetadataCacheMgr.getCache(catalog);
+    public void removeCatalog(long catalogId) {
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "removeCatalog",
+                () -> cache.invalidateCatalog(catalogId)));
     }
 
-    public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
-        return 
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
+    public void removeCatalogByEngine(long catalogId, String engine) {
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "removeCatalogByEngine",
+                () -> cache.invalidateCatalog(catalogId)));
     }
 
-    public FileSystemCache getFsCache() {
-        return fsCache;
+    public void invalidateDb(long catalogId, String dbName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateDb", () -> 
cache.invalidateDb(catalogId, normalizedDbName)));
     }
 
-    public ExternalRowCountCache getRowCountCache() {
-        return rowCountCache;
+    public void invalidateTable(long catalogId, String dbName, String 
tableName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidateTable",
+                () -> cache.invalidateTable(catalogId, normalizedDbName, 
tableName)));
+    }
+
+    public void invalidateTableByEngine(long catalogId, String engine, String 
dbName, String tableName) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeSpecifiedEngine(engine, cache -> safeInvalidate(
+                cache, catalogId, "invalidateTableByEngine",
+                () -> cache.invalidateTable(catalogId, normalizedDbName, 
tableName)));
+    }
+
+    public void invalidatePartitions(long catalogId,
+            String dbName, String tableName, List<String> partitions) {
+        String normalizedDbName = ClusterNamespace.getNameFromFullName(dbName);
+        routeCatalogEngines(catalogId, cache -> safeInvalidate(
+                cache, catalogId, "invalidatePartitions",
+                () -> cache.invalidatePartitions(catalogId, normalizedDbName, 
tableName, partitions)));
     }
 
-    public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
-        return dorisExternalMetaCacheMgr;
+    public Map<String, Map<String, String>> getCatalogCacheStats(long 
catalogId) {
+        Map<String, Map<String, String>> stats = Maps.newHashMap();
+        engineCaches.forEach((engineName, externalMetaCache) -> 
externalMetaCache.stats(catalogId)
+                .forEach((entryName, entryStats) -> stats.put(engineName + "." 
+ entryName, entryStats)));
+        return stats;
     }
 
-    public void removeCache(long catalogId) {
-        if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove hive metastore cache for catalog {}", catalogId);
+    private void initEngineCaches() {
+        registerBuiltinEngineCaches();
+        loadSpiEngineCaches();
+    }
+
+    private void registerBuiltinEngineCaches() {
+        registerEngineCache(new DefaultExternalMetaCache(ENGINE_DEFAULT, 
commonRefreshExecutor), "builtin");
+        registerEngineCache(new HiveExternalMetaCache(commonRefreshExecutor, 
fileListingExecutor), "builtin");
+        registerEngineCache(new HudiExternalMetaCache(commonRefreshExecutor), 
"builtin");
+        registerEngineCache(new 
IcebergExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new 
PaimonExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new 
MaxComputeExternalMetaCache(commonRefreshExecutor), "builtin");
+        registerEngineCache(new DorisExternalMetaCache(commonRefreshExecutor), 
"builtin");
+    }
+
+    private void loadSpiEngineCaches() {
+        
loadSpiEngineCaches(ServiceLoader.load(ExternalMetaCacheFactory.class), 
"classpath");
+        try {
+            
loadSpiEngineCaches(ClassLoaderUtils.loadServicesFromDirectory(ExternalMetaCacheFactory.class),
+                    Config.external_meta_cache_plugins_dir);
+        } catch (IOException e) {
+            LOG.warn("failed to load external meta cache SPI factories from 
directory {}",
+                    Config.external_meta_cache_plugins_dir, e);
         }
-        if (schemaCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove schema cache for catalog {}", catalogId);
+    }
+
+    private void loadSpiEngineCaches(Iterable<ExternalMetaCacheFactory> 
factories, String source) {
+        for (ExternalMetaCacheFactory factory : factories) {
+            registerSpiFactory(factory, source);
         }
-        if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
-            LOG.info("remove iceberg meta cache for catalog {}", catalogId);
+    }
+
+    private void registerSpiFactory(ExternalMetaCacheFactory factory, String 
source) {
+        String engineName = normalizeEngineName(factory.engine());
+        ExternalMetaCache cache;
+        try {
+            cache = factory.create(factoryContext);
+        } catch (Exception e) {
+            LOG.warn("failed to initialize external meta cache SPI factory {} 
for engine {}",
+                    factory.getClass().getName(), engineName, e);
+            return;
         }
-        hudiMetadataCacheMgr.removeCache(catalogId);
-        maxComputeMetadataCacheMgr.removeCache(catalogId);
-        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.removeCache(catalogId);
-        if (paimonMetadataCache != null) {
-            paimonMetadataCache.invalidateCatalogCache(catalogId);
+        if (cache == null) {
+            LOG.warn("external meta cache SPI factory {} returns null cache 
for engine {}",
+                    factory.getClass().getName(), engineName);
+            return;
         }
-        dorisExternalMetaCacheMgr.removeCache(catalogId);
+        registerEngineCache(cache, "spi(" + source + ")");
     }
 
-    public void invalidateTableCache(ExternalTable dorisTable) {
-        ExternalSchemaCache schemaCache = 
schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (schemaCache != null) {
-            schemaCache.invalidateTableCache(dorisTable);
+    private void registerEngineCache(ExternalMetaCache cache, String source) {
+        String engineName = normalizeEngineName(cache.engine());
+        ExternalMetaCache existing = engineCaches.putIfAbsent(engineName, 
cache);
+        if (existing != null) {
+            LOG.warn("skip duplicated external meta cache engine '{}' from {}, 
existing class: {}, new class: {}",
+                    engineName, source, existing.getClass().getName(), 
cache.getClass().getName());
+            return;
         }
-        HiveMetaStoreCache hiveMetaCache = 
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (hiveMetaCache != null) {
-            
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
+        if ("builtin".equals(source)) {
+            LOG.debug("registered external meta cache engine '{}' from {}", 
engineName, source);
+        } else {
+            LOG.info("registered external meta cache engine '{}' from {}", 
engineName, source);
         }
-        IcebergMetadataCache icebergMetadataCache = 
icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (icebergMetadataCache != null) {
-            icebergMetadataCache.invalidateTableCache(dorisTable);
+    }
+
+    private static String normalizeEngineName(String engine) {
+        if (engine == null) {
+            return ENGINE_DEFAULT;
         }
-        hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
-        maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
-        PaimonMetadataCache paimonMetadataCache = 
paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
-        if (paimonMetadataCache != null) {
-            paimonMetadataCache.invalidateTableCache(dorisTable);
+        String normalized = engine.trim().toLowerCase(Locale.ROOT);
+        if (normalized.isEmpty()) {
+            return ENGINE_DEFAULT;
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("invalid table cache for {}.{} in catalog {}", 
dorisTable.getRemoteDbName(),
-                    dorisTable.getRemoteName(), 
dorisTable.getCatalog().getName());
+        switch (normalized) {
+            case "hms":
+                return ENGINE_HIVE;
+            case "external_doris":
+                return ENGINE_DORIS;
+            case "max_compute":
+                return ENGINE_MAXCOMPUTE;
+            default:
+                return normalized;
         }
     }
 
-    public void invalidateDbCache(long catalogId, String dbName) {
-        dbName = ClusterNamespace.getNameFromFullName(dbName);
-        ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
-        if (schemaCache != null) {
-            schemaCache.invalidateDbCache(dbName);
+    private void routeCatalogEngines(long catalogId, 
Consumer<ExternalMetaCache> action) {

Review Comment:
   **Design concern: Broadcasting to all engines is wasteful**
   
   `routeCatalogEngines` iterates over ALL registered engine caches (currently 
7: default, hive, hudi, iceberg, paimon, maxcompute, doris) for every 
operation. For `prepareCatalog()`, this means a Hive-only catalog gets 
Iceberg/Paimon/Hudi/MaxCompute/Doris cache infrastructure allocated (19 empty 
Caffeine `LoadingCache` instances per catalog).
   
   The memory impact is modest (~tens of KB per catalog from empty Caffeine 
caches), and `computeIfAbsent` makes repeated calls no-ops, so this is safe but 
wasteful.
   
   Consider:
   1. Filtering by the catalog's actual engine type (the information is 
available from `ExternalCatalog.getType()`), or
   2. At minimum, adding a comment documenting this broadcast-to-all design as 
intentional, explaining the rationale (e.g., a catalog might be accessed by 
tables from multiple engines?)



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java:
##########
@@ -214,6 +213,13 @@ public String getEngine() {
         return getType().toEngineName();
     }
 
+    /**
+     * Returns the effective meta cache engine for this table.
+     */
+    public String getMetaCacheEngine() {

Review Comment:
   **Risk: No compile-time safety for `getMetaCacheEngine()` override**
   
   This method throws `IllegalStateException` by default. Any new 
`ExternalTable` subclass that forgets to override it will fail at runtime when 
`getSchemaCacheValue()` is called — there's no compile-time enforcement.
   
   Consider making this method `abstract` (which would force all subclasses to 
implement it), or at minimum providing a safe default return value like 
`"default"` (since `DefaultExternalMetaCache` exists as a fallback). Currently 
several table types (ES, JDBC, InfoSchema, LakeSoul, TrinoConnector, Test) 
already override to return `"default"`, suggesting that could be a reasonable 
default.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/AbstractExternalMetaCache.java:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.metacache;
+
+import org.apache.doris.common.Config;
+
+import com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Base implementation of {@link ExternalMetaCache}.
+ * It keeps the shared in-memory layout
+ * Map&lt;CatalogId, CatalogEntryGroup&gt;, implements default
+ * lifecycle behavior, and provides conservative invalidation fallback.
+ * Subclasses register entry definitions during construction and entries are
+ * initialized eagerly by {@link #initCatalog(long)}.
+ */
+public abstract class AbstractExternalMetaCache implements ExternalMetaCache {
+    protected static final CacheSpec DEFAULT_ENTRY_CACHE_SPEC =
+            CacheSpec.fromTtlValue(
+                    null,
+                    Config.external_cache_expire_time_seconds_after_access,
+                    Config.max_external_table_cache_num);
+
+    private final String engine;
+    private final ExecutorService refreshExecutor;
+    private final Map<Long, CatalogEntryGroup> catalogEntries = 
Maps.newConcurrentMap();
+    private final Map<String, MetaCacheEntryDef<?, ?>> metaCacheEntryDefs = 
Maps.newConcurrentMap();
+
+    protected AbstractExternalMetaCache(String engine, ExecutorService 
refreshExecutor) {
+        this.engine = engine;
+        this.refreshExecutor = Objects.requireNonNull(refreshExecutor, 
"refreshExecutor can not be null");
+    }
+
+    @Override
+    public String engine() {
+        return engine;
+    }
+
+    @Override
+    public void initCatalog(long catalogId, Map<String, String> 
catalogProperties) {
+        Map<String, String> safeCatalogProperties = 
CacheSpec.applyCompatibilityMap(
+                catalogProperties, catalogPropertyCompatibilityMap());
+        catalogEntries.computeIfAbsent(catalogId, id -> 
buildCatalogEntryGroup(safeCatalogProperties));
+    }
+
+    @Override
+    public void checkCatalogInitialized(long catalogId) {
+        requireCatalogEntryGroup(catalogId);
+    }
+
+    /**
+     * Optional compatibility map for legacy catalog properties.
+     *
+     * <p>Map format: {@code legacyKey -> newKey}. The mapping is applied 
before
+     * entry cache specs are parsed. If both keys exist, new key keeps 
precedence.
+     */
+    protected Map<String, String> catalogPropertyCompatibilityMap() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K, V> MetaCacheEntry<K, V> entry(long catalogId, String entryName, 
Class<K> keyType, Class<V> valueType) {
+        CatalogEntryGroup group = requireCatalogEntryGroup(catalogId);
+        MetaCacheEntryDef<?, ?> def = requireMetaCacheEntryDef(entryName);
+        ensureTypeCompatible(def, keyType, valueType);
+
+        MetaCacheEntry<?, ?> cacheEntry = group.get(entryName);
+        if (cacheEntry == null) {
+            throw new IllegalStateException(String.format(
+                    "Entry '%s' is not initialized for engine '%s', catalog 
%d.",
+                    entryName, engine, catalogId));
+        }
+        return (MetaCacheEntry<K, V>) cacheEntry;
+    }
+
+    @Override
+    public void invalidateCatalog(long catalogId) {
+        CatalogEntryGroup removed = catalogEntries.remove(catalogId);
+        if (removed != null) {
+            removed.invalidateAll();
+        }
+    }
+
+    @Override
+    public void invalidateCatalogEntries(long catalogId) {
+        CatalogEntryGroup group = catalogEntries.get(catalogId);
+        if (group != null) {
+            group.invalidateAll();
+        }
+    }
+
+    @Override
+    public void invalidateDb(long catalogId, String dbName) {
+        // Generic key mapping is unknown in base class; fallback to 
catalog-scope invalidation.
+        invalidateCatalog(catalogId);
+    }
+
+    @Override
+    public void invalidateTable(long catalogId, String dbName, String 
tableName) {
+        // Generic key mapping is unknown in base class; fallback to 
catalog-scope invalidation.
+        invalidateCatalog(catalogId);
+    }
+
+    @Override
+    public void invalidatePartitions(long catalogId, String dbName, String 
tableName, List<String> partitions) {
+        // Generic key mapping is unknown in base class; fallback to 
catalog-scope invalidation.
+        invalidateCatalog(catalogId);
+    }
+
+    @Override
+    public Map<String, Map<String, String>> stats(long catalogId) {
+        CatalogEntryGroup group = catalogEntries.get(catalogId);
+        return group == null ? Maps.newHashMap() : group.stats();
+    }
+
+    @Override
+    public void close() {
+        catalogEntries.values().forEach(CatalogEntryGroup::invalidateAll);
+        catalogEntries.clear();
+    }
+
+    protected final <K, V> void registerMetaCacheEntryDef(MetaCacheEntryDef<K, 
V> entryDef) {
+        Objects.requireNonNull(entryDef, "entryDef");
+        if (!catalogEntries.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format("Can not register entry '%s' after catalog 
initialization for engine '%s'.",
+                            entryDef.getName(), engine));
+        }
+        MetaCacheEntryDef<?, ?> existing = 
metaCacheEntryDefs.putIfAbsent(entryDef.getName(), entryDef);
+        if (existing != null) {
+            throw new IllegalArgumentException(
+                    String.format("Duplicated entry definition '%s' for engine 
'%s'.", entryDef.getName(), engine));
+        }
+    }
+
+    protected final <K, V> MetaCacheEntry<K, V> entry(long catalogId, 
MetaCacheEntryDef<K, V> entryDef) {
+        validateRegisteredMetaCacheEntryDef(entryDef);
+        return entry(catalogId, entryDef.getName(), entryDef.getKeyType(), 
entryDef.getValueType());
+    }
+
+    private CatalogEntryGroup requireCatalogEntryGroup(long catalogId) {
+        CatalogEntryGroup group = catalogEntries.get(catalogId);
+        if (group == null) {
+            throw new IllegalStateException(String.format(
+                    "Catalog %d is not initialized for engine '%s'. Call 
initCatalog first.",
+                    catalogId, engine));
+        }
+        return group;
+    }
+
+    private MetaCacheEntryDef<?, ?> requireMetaCacheEntryDef(String entryName) 
{
+        MetaCacheEntryDef<?, ?> entryDef = metaCacheEntryDefs.get(entryName);
+        if (entryDef == null) {
+            throw new IllegalArgumentException(String.format(
+                    "Entry '%s' is not registered for engine '%s'.", 
entryName, engine));
+        }
+        return entryDef;
+    }
+
+    private void ensureTypeCompatible(MetaCacheEntryDef<?, ?> entryDef, 
Class<?> keyType, Class<?> valueType) {
+        if (!entryDef.getKeyType().equals(keyType) || 
!entryDef.getValueType().equals(valueType)) {

Review Comment:
   **Bug: Strict `equals()` breaks subclass key types**
   
   Using `Class.equals()` here means that if a `SchemaCacheKey` subclass (e.g., 
`IcebergSchemaCacheKey`) is registered as the key type in a 
`MetaCacheEntryDef`, but the caller passes the base `SchemaCacheKey.class` (or 
vice versa), this check will throw.
   
   This creates a fragile coupling between the registered key type and the 
runtime type passed through `entry()`. Consider using `isAssignableFrom()` 
instead:
   ```java
   if (!entryDef.getKeyType().isAssignableFrom(keyType) || 
!entryDef.getValueType().isAssignableFrom(valueType)) {
   ```
   This would allow subclass keys to be accepted when the entry def uses a 
parent type, or the reverse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to