github-actions[bot] commented on code in PR #60937:
URL: https://github.com/apache/doris/pull/60937#discussion_r2964020045
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java:
##########
@@ -161,199 +156,224 @@ public ExecutorService getScheduleExecutor() {
return scheduleExecutor;
}
- public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
- return hiveMetaStoreCacheMgr.getCache(catalog);
+ ExternalMetaCache engine(String engine) {
+ return cacheRegistry.resolve(engine);
}
- public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
- return schemaCacheMgr.getCache(catalog);
+ public HiveExternalMetaCache hive(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_HIVE);
+ return (HiveExternalMetaCache) engine(ENGINE_HIVE);
}
- public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog
catalog) {
- return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
+ public HudiExternalMetaCache hudi(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_HUDI);
+ return (HudiExternalMetaCache) engine(ENGINE_HUDI);
}
- public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog
catalog) {
- return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
+ public IcebergExternalMetaCache iceberg(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_ICEBERG);
+ return (IcebergExternalMetaCache) engine(ENGINE_ICEBERG);
}
- public HudiCachedMetaClientProcessor
getMetaClientProcessor(ExternalCatalog catalog) {
- return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
+ public PaimonExternalMetaCache paimon(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_PAIMON);
+ return (PaimonExternalMetaCache) engine(ENGINE_PAIMON);
}
- public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
- return hudiMetadataCacheMgr;
+ public MaxComputeExternalMetaCache maxCompute(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_MAXCOMPUTE);
+ return (MaxComputeExternalMetaCache) engine(ENGINE_MAXCOMPUTE);
}
- public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog
catalog) {
- return icebergMetadataCacheMgr.getCache(catalog);
+ public DorisExternalMetaCache doris(long catalogId) {
+ prepareCatalogByEngine(catalogId, ENGINE_DORIS);
+ return (DorisExternalMetaCache) engine(ENGINE_DORIS);
}
- public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog)
{
- return paimonMetadataCacheMgr.getCache(catalog);
+ public void prepareCatalog(long catalogId) {
+ Map<String, String> catalogProperties =
getCatalogProperties(catalogId);
+ routeCatalogEngines(catalogId, cache -> cache.initCatalog(catalogId,
catalogProperties));
}
- public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
- return
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
+ public void prepareCatalogByEngine(long catalogId, String engine) {
+ prepareCatalogByEngine(catalogId, engine,
getCatalogProperties(catalogId));
}
- public FileSystemCache getFsCache() {
- return fsCache;
+ public void prepareCatalogByEngine(long catalogId, String engine,
Map<String, String> catalogProperties) {
+ Map<String, String> safeCatalogProperties = catalogProperties == null
+ ? Maps.newHashMap()
+ : Maps.newHashMap(catalogProperties);
+ routeSpecifiedEngine(engine, cache -> cache.initCatalog(catalogId,
safeCatalogProperties));
}
- public ExternalRowCountCache getRowCountCache() {
- return rowCountCache;
+ public void invalidateCatalog(long catalogId) {
+ routeCatalogEngines(catalogId, cache -> safeInvalidate(
+ cache, catalogId, "invalidateCatalog",
+ () -> cache.invalidateCatalogEntries(catalogId)));
}
- public DorisExternalMetaCacheMgr getDorisExternalMetaCacheMgr() {
- return dorisExternalMetaCacheMgr;
+ public void invalidateCatalogByEngine(long catalogId, String engine) {
+ routeSpecifiedEngine(engine, cache -> safeInvalidate(
+ cache, catalogId, "invalidateCatalogByEngine",
+ () -> cache.invalidateCatalogEntries(catalogId)));
}
- public void removeCache(long catalogId) {
- if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
- LOG.info("remove hive metastore cache for catalog {}", catalogId);
- }
- if (schemaCacheMgr.removeCache(catalogId) != null) {
- LOG.info("remove schema cache for catalog {}", catalogId);
- }
- if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
- LOG.info("remove iceberg meta cache for catalog {}", catalogId);
- }
- hudiMetadataCacheMgr.removeCache(catalogId);
- maxComputeMetadataCacheMgr.removeCache(catalogId);
- PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.removeCache(catalogId);
- if (paimonMetadataCache != null) {
- paimonMetadataCache.invalidateCatalogCache(catalogId);
- }
- dorisExternalMetaCacheMgr.removeCache(catalogId);
+ public void removeCatalog(long catalogId) {
+ routeCatalogEngines(catalogId, cache -> safeInvalidate(
+ cache, catalogId, "removeCatalog",
+ () -> cache.invalidateCatalog(catalogId)));
}
- public void invalidateTableCache(ExternalTable dorisTable) {
- ExternalSchemaCache schemaCache =
schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
- if (schemaCache != null) {
- schemaCache.invalidateTableCache(dorisTable);
- }
- HiveMetaStoreCache hiveMetaCache =
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
- if (hiveMetaCache != null) {
-
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
- }
- IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
- if (icebergMetadataCache != null) {
- icebergMetadataCache.invalidateTableCache(dorisTable);
- }
- hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
- maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
- PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
- if (paimonMetadataCache != null) {
- paimonMetadataCache.invalidateTableCache(dorisTable);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("invalid table cache for {}.{} in catalog {}",
dorisTable.getRemoteDbName(),
- dorisTable.getRemoteName(),
dorisTable.getCatalog().getName());
- }
+ public void removeCatalogByEngine(long catalogId, String engine) {
+ routeSpecifiedEngine(engine, cache -> safeInvalidate(
+ cache, catalogId, "removeCatalogByEngine",
+ () -> cache.invalidateCatalog(catalogId)));
}
- public void invalidateDbCache(long catalogId, String dbName) {
- ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
- if (schemaCache != null) {
- schemaCache.invalidateDbCache(dbName);
- }
- HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
- if (metaCache != null) {
- metaCache.invalidateDbCache(dbName);
- }
- IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(catalogId);
- if (icebergMetadataCache != null) {
- icebergMetadataCache.invalidateDbCache(catalogId, dbName);
- }
- hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
- maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
- PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(catalogId);
- if (paimonMetadataCache != null) {
- paimonMetadataCache.invalidateDbCache(catalogId, dbName);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("invalid db cache for {} in catalog {}", dbName,
catalogId);
- }
+ public void invalidateDb(long catalogId, String dbName) {
+ routeCatalogEngines(catalogId, cache -> safeInvalidate(
+ cache, catalogId, "invalidateDb", () ->
cache.invalidateDb(catalogId, dbName)));
+ }
+
+ public void invalidateTable(long catalogId, String dbName, String
tableName) {
+ routeCatalogEngines(catalogId, cache -> safeInvalidate(
+ cache, catalogId, "invalidateTable",
+ () -> cache.invalidateTable(catalogId, dbName, tableName)));
+ }
+
+ public void invalidateTableByEngine(long catalogId, String engine, String
dbName, String tableName) {
+ routeSpecifiedEngine(engine, cache -> safeInvalidate(
+ cache, catalogId, "invalidateTableByEngine",
+ () -> cache.invalidateTable(catalogId, dbName, tableName)));
+ }
+
+ public void invalidatePartitions(long catalogId,
+ String dbName, String tableName, List<String> partitions) {
+ routeCatalogEngines(catalogId, cache -> safeInvalidate(
+ cache, catalogId, "invalidatePartitions",
+ () -> cache.invalidatePartitions(catalogId, dbName, tableName,
partitions)));
}
- public void invalidateCatalogCache(long catalogId) {
- schemaCacheMgr.removeCache(catalogId);
- HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
- if (metaCache != null) {
- metaCache.invalidateAll();
+ public List<CatalogMetaCacheStats> getCatalogCacheStats(long catalogId) {
+ List<CatalogMetaCacheStats> stats = new ArrayList<>();
+ cacheRegistry.allCaches().forEach(externalMetaCache ->
externalMetaCache.stats(catalogId)
+ .forEach((entryName, entryStats) -> stats.add(
+ new CatalogMetaCacheStats(externalMetaCache.engine(),
entryName, entryStats))));
+ stats.sort(Comparator.comparing(CatalogMetaCacheStats::getEngineName)
+ .thenComparing(CatalogMetaCacheStats::getEntryName));
+ return stats;
+ }
+
+ public static final class CatalogMetaCacheStats {
+ private final String engineName;
+ private final String entryName;
+ private final MetaCacheEntryStats entryStats;
+
+ public CatalogMetaCacheStats(String engineName, String entryName,
MetaCacheEntryStats entryStats) {
+ this.engineName = Objects.requireNonNull(engineName, "engineName");
+ this.entryName = Objects.requireNonNull(entryName, "entryName");
+ this.entryStats = Objects.requireNonNull(entryStats, "entryStats");
}
- IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(catalogId);
- if (icebergMetadataCache != null) {
- icebergMetadataCache.invalidateCatalogCache(catalogId);
+
+ public String getEngineName() {
+ return engineName;
}
- hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
- maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
- PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(catalogId);
- if (paimonMetadataCache != null) {
- paimonMetadataCache.invalidateCatalogCache(catalogId);
+
+ public String getEntryName() {
+ return entryName;
}
- dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("invalid catalog cache for {}", catalogId);
+
+ public MetaCacheEntryStats getEntryStats() {
+ return entryStats;
}
}
- public void invalidSchemaCache(long catalogId) {
- schemaCacheMgr.removeCache(catalogId);
+ private void initEngineCaches() {
+ registerBuiltinEngineCaches();
+ }
+
+ private void registerBuiltinEngineCaches() {
+ cacheRegistry.register(new DefaultExternalMetaCache(ENGINE_DEFAULT,
commonRefreshExecutor));
+ cacheRegistry.register(new
HiveExternalMetaCache(commonRefreshExecutor, fileListingExecutor));
+ cacheRegistry.register(new
HudiExternalMetaCache(commonRefreshExecutor));
+ cacheRegistry.register(new
IcebergExternalMetaCache(commonRefreshExecutor));
+ cacheRegistry.register(new
PaimonExternalMetaCache(commonRefreshExecutor));
+ cacheRegistry.register(new
MaxComputeExternalMetaCache(commonRefreshExecutor));
+ cacheRegistry.register(new
DorisExternalMetaCache(commonRefreshExecutor));
+ }
+
+ private void routeCatalogEngines(long catalogId,
Consumer<ExternalMetaCache> action) {
+ routeResolver.resolveCatalogCaches(catalogId,
getCatalog(catalogId)).forEach(action);
+ }
+
+ private void routeSpecifiedEngine(String engine,
Consumer<ExternalMetaCache> action) {
+ action.accept(this.engine(engine));
}
- public void addPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
- String dbName = table.getDbName();
- HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
- if (metaCache != null) {
- List<Type> partitionColumnTypes;
- try {
- partitionColumnTypes =
table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
- } catch (NotSupportedException e) {
- LOG.warn("Ignore not supported hms table, message: {} ",
e.getMessage());
- return;
+ List<String> resolveCatalogEngineNamesForTest(@Nullable CatalogIf<?>
catalog, long catalogId) {
+ List<String> resolved = new ArrayList<>();
+ routeResolver.resolveCatalogCaches(catalogId, catalog).forEach(cache
-> resolved.add(cache.engine()));
+ return new ArrayList<>(resolved);
+ }
+
+ private void safeInvalidate(ExternalMetaCache cache, long catalogId,
String operation, Runnable action) {
+ if (!cache.isCatalogInitialized(catalogId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skip {} for catalog {} on engine '{}' because cache
entry is absent",
+ operation, catalogId, cache.engine());
}
- metaCache.addPartitionsCache(table.getOrBuildNameMapping(),
partitionNames, partitionColumnTypes);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("add partition cache for {}.{} in catalog {}", dbName,
table.getName(), catalogId);
+ return;
}
+ action.run();
}
- public void dropPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
- String dbName = table.getDbName();
- HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
- if (metaCache != null) {
- metaCache.dropPartitionsCache(table, partitionNames, true);
+ private Map<String, String> getCatalogProperties(long catalogId) {
+ CatalogIf<?> catalog = getCatalog(catalogId);
+ if (catalog == null) {
+ throw new IllegalStateException(String.format("Catalog %d does not
exist.", catalogId));
Review Comment:
**[Concurrency]** `getCatalogProperties()` throws unchecked
`IllegalStateException` when the catalog doesn't exist. This is called from
`prepareCatalogByEngine()` (line 199), which is called from query-path methods
like `hive(catalogId)`, `iceberg(catalogId)`, etc.
Race scenario: A query thread calls `extMetaCacheMgr.hive(catalogId)` while
`CatalogMgr.removeCatalog()` is executing. The `removeCatalog` flow calls
`extMetaCacheMgr.removeCatalog()` at `CatalogMgr.java:140`, then removes from
`idToCatalog` at line 143. After line 143 completes, the query thread's
`getCatalog(catalogId)` returns `null`, and this line throws
`IllegalStateException`.
Since this is an unchecked exception on query paths, it will surface as an
internal error to users rather than a graceful "catalog not found" message.
Suggestion: Return an empty map or throw a checked exception (e.g.,
`DdlException`) that callers can convert to a user-friendly error.
Alternatively, make `prepareCatalogByEngine` a no-op if the catalog is gone.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java:
##########
@@ -121,19 +125,23 @@ private void addCatalog(CatalogIf catalog) {
}
private CatalogIf removeCatalog(long catalogId) {
- CatalogIf catalog = idToCatalog.remove(catalogId);
- LOG.info("Removed catalog with id {}, name {}", catalogId, catalog ==
null ? "N/A" : catalog.getName());
- if (catalog != null) {
-
Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(catalogId);
- catalog.onClose();
-
Env.getCurrentEnv().getConstraintManager().dropCatalogConstraints(catalog.getName());
- nameToCatalog.remove(catalog.getName());
- if (ConnectContext.get() != null) {
- ConnectContext.get().removeLastDBOfCatalog(catalog.getName());
- }
-
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
- Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
- }
+ CatalogIf catalog = idToCatalog.get(catalogId);
+ if (catalog == null) {
+ LOG.info("Removed catalog with id {}, name N/A", catalogId);
+ return null;
+ }
+
Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(catalogId);
+ catalog.onClose();
Review Comment:
**[Concurrency / Locking]** Two concerns with the ordering here:
1. **Torn-down state visible to readers:** `catalog.onClose()` is called at
line 134, but the catalog isn't removed from `idToCatalog`/`nameToCatalog`
until lines 142-143. Between these lines, concurrent readers (which do bare
`ConcurrentMap.get()` without acquiring `readLock`) can obtain a reference to a
catalog whose internal resources (`metadataOps`, thread pools, etc.) have
already been nulled/shutdown. This can cause NPEs in concurrent query threads.
2. **Blocking I/O under writeLock:** `onClose()` implementations (e.g.,
`HMSExternalCatalog.onClose()`) shut down thread pools and close HMS Thrift
connections — these are potentially blocking operations. The
`fe/fe-core/AGENTS.md` states: *"Metadata-locking paths avoid RPC, external IO,
and journal waits while holding catalog/database/table locks."*
Consider: (a) remove catalog from maps first, then call `onClose()` outside
the lock, or (b) mark the catalog as "closing" before teardown so concurrent
readers get a clean error.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiExternalMetaCache.java:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
+import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
+import org.apache.doris.datasource.metacache.MetaCacheEntryInvalidation;
+
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/**
+ * Hudi engine implementation of {@link AbstractExternalMetaCache}.
+ *
+ * <p>Registered entries:
+ * <ul>
+ * <li>{@code partition}: partition metadata keyed by table identity +
snapshot timestamp + mode</li>
+ * <li>{@code fs_view}: {@link HoodieTableFileSystemView} keyed by {@link
NameMapping}</li>
+ * <li>{@code meta_client}: {@link HoodieTableMetaClient} keyed by {@link
NameMapping}</li>
+ * <li>{@code schema}: Hudi schema cache keyed by table identity +
timestamp</li>
+ * </ul>
+ *
+ * <p>Invalidation behavior:
+ * <ul>
+ * <li>db/table invalidation clears all four entries for matching keys</li>
+ * <li>partition-level invalidation currently falls back to table-level
invalidation</li>
+ * </ul>
+ */
+public class HudiExternalMetaCache extends AbstractExternalMetaCache {
+ private static final Logger LOG =
LogManager.getLogger(HudiExternalMetaCache.class);
+
+ public static final String ENGINE = "hudi";
+ public static final String ENTRY_PARTITION = "partition";
+ public static final String ENTRY_FS_VIEW = "fs_view";
+ public static final String ENTRY_META_CLIENT = "meta_client";
+ public static final String ENTRY_SCHEMA = "schema";
+
+ private final EntryHandle<HudiPartitionCacheKey, TablePartitionValues>
partitionEntry;
+ private final EntryHandle<HudiFsViewCacheKey, HoodieTableFileSystemView>
fsViewEntry;
+ private final EntryHandle<HudiMetaClientCacheKey, HoodieTableMetaClient>
metaClientEntry;
+ private final EntryHandle<HudiSchemaCacheKey, SchemaCacheValue>
schemaEntry;
+
+ public HudiExternalMetaCache(ExecutorService refreshExecutor) {
+ super(ENGINE, refreshExecutor);
+ partitionEntry = registerEntry(MetaCacheEntryDef.of(ENTRY_PARTITION,
HudiPartitionCacheKey.class,
+ TablePartitionValues.class,
this::loadPartitionValuesCacheValue, defaultEntryCacheSpec(),
+
MetaCacheEntryInvalidation.forNameMapping(HudiPartitionCacheKey::getNameMapping)));
+ fsViewEntry = registerEntry(MetaCacheEntryDef.of(ENTRY_FS_VIEW,
HudiFsViewCacheKey.class,
+ HoodieTableFileSystemView.class, this::createFsView,
defaultEntryCacheSpec(),
+
MetaCacheEntryInvalidation.forNameMapping(HudiFsViewCacheKey::getNameMapping)));
+ metaClientEntry =
registerEntry(MetaCacheEntryDef.of(ENTRY_META_CLIENT,
HudiMetaClientCacheKey.class,
+ HoodieTableMetaClient.class,
this::createHoodieTableMetaClient, defaultEntryCacheSpec(),
+
MetaCacheEntryInvalidation.forNameMapping(HudiMetaClientCacheKey::getNameMapping)));
+ schemaEntry = registerEntry(MetaCacheEntryDef.of(ENTRY_SCHEMA,
HudiSchemaCacheKey.class,
+ SchemaCacheValue.class, this::loadSchemaCacheValue,
defaultSchemaCacheSpec(),
+
MetaCacheEntryInvalidation.forNameMapping(HudiSchemaCacheKey::getNameMapping)));
+ }
+
+ public HoodieTableMetaClient getHoodieTableMetaClient(NameMapping
nameMapping) {
+ return
metaClientEntry.get(nameMapping.getCtlId()).get(HudiMetaClientCacheKey.of(nameMapping));
+ }
+
+ public HoodieTableFileSystemView getFsView(NameMapping nameMapping) {
+ return
fsViewEntry.get(nameMapping.getCtlId()).get(HudiFsViewCacheKey.of(nameMapping));
+ }
+
+ public HudiSchemaCacheValue getHudiSchemaCacheValue(NameMapping
nameMapping, long timestamp) {
+ SchemaCacheValue schemaCacheValue =
schemaEntry.get(nameMapping.getCtlId())
+ .get(new HudiSchemaCacheKey(nameMapping, timestamp));
+ return (HudiSchemaCacheValue) schemaCacheValue;
+ }
+
+ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable
table,
+ String timestamp, boolean useHiveSyncPartition) {
+ return partitionEntry.get(table.getCatalog().getId()).get(
+ HudiPartitionCacheKey.of(table.getOrBuildNameMapping(),
Long.parseLong(timestamp),
+ useHiveSyncPartition));
+ }
+
+ public TablePartitionValues getPartitionValues(HMSExternalTable table,
boolean useHiveSyncPartition)
+ throws CacheException {
+ HoodieTableMetaClient tableMetaClient =
getHoodieTableMetaClient(table.getOrBuildNameMapping());
+ TablePartitionValues emptyPartitionValues = new TablePartitionValues();
+ Option<String[]> partitionColumns =
tableMetaClient.getTableConfig().getPartitionFields();
+ if (!partitionColumns.isPresent() || partitionColumns.get().length ==
0) {
+ return emptyPartitionValues;
+ }
+ HoodieTimeline timeline =
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+ Option<HoodieInstant> lastInstant = timeline.lastInstant();
+ if (!lastInstant.isPresent()) {
+ return emptyPartitionValues;
+ }
+ long lastTimestamp = Long.parseLong(lastInstant.get().requestedTime());
+ return partitionEntry.get(table.getCatalog().getId()).get(
+ HudiPartitionCacheKey.of(table.getOrBuildNameMapping(),
lastTimestamp, useHiveSyncPartition));
+ }
+
+ private HoodieTableFileSystemView createFsView(HudiFsViewCacheKey key) {
+ HoodieTableMetaClient tableMetaClient =
metaClientEntry.get(key.getNameMapping().getCtlId())
+ .get(HudiMetaClientCacheKey.of(key.getNameMapping()));
+ HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
+ HoodieLocalEngineContext ctx = new
HoodieLocalEngineContext(tableMetaClient.getStorageConf());
+ return FileSystemViewManager.createInMemoryFileSystemView(ctx,
tableMetaClient, metadataConfig);
+ }
+
+ private HoodieTableMetaClient
createHoodieTableMetaClient(HudiMetaClientCacheKey key) {
+ LOG.debug("create hudi table meta client for {}.{}",
key.getNameMapping().getFullLocalName());
Review Comment:
**[Minor / Pre-existing]** Format string mismatch: `"{}.{}"` has 2
placeholders but only 1 argument (`key.getNameMapping().getFullLocalName()`).
The second `{}` will appear literally in log output. This bug was carried over
from the old `HudiCachedMetaClientProcessor`.
Fix: Change to `"create hudi table meta client for {}"` since
`getFullLocalName()` already returns a dotted name like `db.table`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]