okumin commented on code in PR #6441:
URL: https://github.com/apache/hive/pull/6441#discussion_r3226537976


##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    long count = cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when an L1 cache hit occurs for a given table identifier.
+   * Only fired when the L2 cache also has the entry.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheHit(TableIdentifier tid) {
+    long count = l1CacheHitCount.incrementAndGet();
+    LOG.debug("L1 cache hit {}: {}", tid, count);
   }
 
+  /**
+   * Callback when an L1 cache miss occurs for a given table identifier.
+   * Only fired when the L2 cache has the entry but L1 is absent or expired.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheMiss(TableIdentifier tid) {
+    long count = l1CacheMissCount.incrementAndGet();
+    LOG.debug("L1 cache miss {}: {}", tid, count);
+  }
+
+  // Getter methods for accessing metrics
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
   }
 
   @Override
-  public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.loadNamespaceMetadata(nmspc);
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
   }
 
   @Override
-  public boolean dropNamespace(Namespace nmspc) throws 
NamespaceNotEmptyException {
-    List<TableIdentifier> tables = listTables(nmspc);
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  @Override
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public long getL1CacheHitCount() {
+    return l1CacheHitCount.get();
+  }
+
+  @Override
+  public long getL1CacheMissCount() {
+    return l1CacheMissCount.get();
+  }
+
+  @Override
+  public double getL1CacheHitRate() {
+    long hits = l1CacheHitCount.get();
+    long total = hits + l1CacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public void resetCacheStats() {
+    cacheHitCount.set(0);
+    cacheMissCount.set(0);
+    cacheLoadCount.set(0);
+    cacheInvalidateCount.set(0);
+    cacheMetaLoadCount.set(0);
+    l1CacheHitCount.set(0);
+    l1CacheMissCount.set(0);
+    LOG.debug("Cache stats reset");
+  }
+
+  @Override
+  public void close() {
+    unregisterJmx();
+  }
+
+  /**
+   * Unregisters this instance from the platform MBeanServer.
+   */
+  private void unregisterJmx() {
+    if (jmxObjectName != null) {
+      try {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        if (mbs.isRegistered(jmxObjectName)) {
+          mbs.unregisterMBean(jmxObjectName);
+          LOG.info("Unregistered JMX MBean: {}", jmxObjectName);
+        }
+      } catch (JMException e) {
+        LOG.warn("Failed to unregister JMX MBean: {}", jmxObjectName, e);
+      } finally {
+        jmxObjectName = null;
+      }
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
+  }
+
+  /**
+   * Canonicalizes the given table identifier based on the case sensitivity of 
the underlying catalog.
+   * Copied from CachingCatalog that exposes it as private.
+   * @param tableIdentifier the table identifier to canonicalize
+   * @return the canonicalized table identifier
+   */
+  private TableIdentifier canonicalizeIdentifier(TableIdentifier 
tableIdentifier) {
+    return this.caseSensitive ? tableIdentifier : 
tableIdentifier.toLowerCase();
+  }
+
+  @Override
+  public void invalidateTable(TableIdentifier ident) {
+    super.invalidateTable(ident);
+    l1Cache.remove(ident);
+  }
+
+  @Override
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = canonicalizeIdentifier(identifier);
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    long now = System.currentTimeMillis();
+    if (cachedTable != null) {
+      // Determine if L1 cache is valid based on the last cached time and the 
TTL.
+      // If the table is in L1 cache, we can skip the location check and 
return the cached table directly,
+      // which can significantly reduce the latency for repeated access to the 
same table.
+      Long lastCached = l1Cache.get(canonicalized);
+      if (lastCached != null) {
+        if (now - lastCached < l1Ttl) {
+          LOG.debug("Table {} is in L1 cache, returning cached table", 
canonicalized);
+          onL1CacheHit(canonicalized);
+          onCacheHit(canonicalized);
+          return cachedTable;

Review Comment:
   Can we make L1 stuff out of scope from this PR? As CachingCatalog is 
designed for client-side caching, there is no additional authorization against 
cached objects. It means if Alice successfully loads `abc` table into the cache 
and then Bob fetches it, the security boundary can be breached. It never 
happens on the client-side since it is always used by Alice if it is 
initialized by Alice. In the case of server-side (HMS), it is not always true. 
I'm not 100% confident that the current implementation is safe enough.
   If we could split a PR, I could merge the majority of this PR quickly and 
then closely review the security model for the advanced feature in the second 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to