This is an automated email from the ASF dual-hosted git repository.

deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4a6187072 HIVE-29281: Make proactive cache eviction work with 
catalog (#6379)
4c4a6187072 is described below

commit 4c4a6187072ff2cd90c4dc4962a308200e6fdb66
Author: Neeraj Khatri <[email protected]>
AuthorDate: Mon Jun 15 17:45:07 2026 +0530

    HIVE-29281: Make proactive cache eviction work with catalog (#6379)
---
 .../apache/iceberg/orc/VectorizedReadUtils.java    |   2 +-
 llap-common/src/protobuf/LlapDaemonProtocol.proto  |   3 +-
 .../io/api/impl/LlapCacheMetadataSerializer.java   |  16 +-
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |   8 +-
 .../hive/llap/io/encoded/OrcEncodedDataReader.java |   2 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java    |   2 +-
 .../llap/io/metadata/OrcFileEstimateErrors.java    |   4 +-
 .../hive/llap/cache/TestCacheContentsTracker.java  |  59 ++--
 .../hadoop/hive/llap/cache/TestFileCache.java      |   3 +-
 .../hive/llap/cache/TestLowLevelCacheImpl.java     |   8 +-
 .../hive/llap/cache/TestOrcMetadataCache.java      |  19 +-
 .../hive/llap/cache/TestProactiveEviction.java     | 327 +++++++++++++++++++++
 .../api/impl/TestLlapCacheMetadataSerializer.java  |   2 +-
 .../org/apache/hadoop/hive/llap/LlapHiveUtils.java |  20 +-
 .../apache/hadoop/hive/llap/ProactiveEviction.java | 256 +++++++++-------
 .../ql/ddl/database/drop/DropDatabaseAnalyzer.java |   3 +
 .../ddl/database/drop/DropDatabaseOperation.java   |   2 +-
 .../hive/ql/ddl/table/drop/DropTableOperation.java |   2 +-
 .../drop/AlterTableDropPartitionOperation.java     |   3 +-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |   3 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java |   2 +-
 .../vector/VectorizedParquetRecordReader.java      |   2 +-
 .../apache/hadoop/hive/ql/plan/PartitionDesc.java  |   7 +
 .../org/apache/hadoop/hive/ql/plan/TableDesc.java  |  31 +-
 .../clientpositive/llap/db_ddl_explain.q.out       |   1 +
 .../org/apache/hadoop/hive/common/io/CacheTag.java |  48 +--
 26 files changed, 640 insertions(+), 195 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index c05f8bc62ab..34b99d6b00b 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path, 
SyntheticFileId fileId,
       // Note: Since Hive doesn't know about partition information of Iceberg 
tables, partitionDesc is only used to
       // deduct the table (and DB) name here.
       CacheTag cacheTag = HiveConf.getBoolVar(job, 
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
-          LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) 
: null;
+          LlapHiveUtils.getCacheTag(path, true, partitionDesc) : null;
 
       try {
         // Schema has to be serialized and deserialized as it is passed 
between different packages of TypeDescription:
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto 
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 641958aef8a..8b15f4392eb 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -233,10 +233,11 @@ message SetCapacityRequestProto {
 message SetCapacityResponseProto {
 }
 
-// Used for proactive eviction request. Must contain one DB name, and 
optionally table information.
+// Used for proactive eviction request. Must contain a DB name, and optionally 
table information and catalog name.
 message EvictEntityRequestProto {
   required string db_name = 1;
   repeated TableProto table = 2;
+  optional string catalog_name = 3 [default = "hive"];
 }
 
 // Used in EvictEntityRequestProto, can be used for non-partitioned and 
partitioned tables too.
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
index dcb90ec197d..aa3b6fdd066 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,7 +150,7 @@ public void 
loadData(LlapDaemonProtocolProtos.CacheEntryList data) {
   }
 
   private void loadData(LlapDaemonProtocolProtos.CacheEntry ce) throws 
IOException {
-    CacheTag cacheTag = decodeCacheTag(ce.getCacheTag());
+    CacheTag cacheTag = decodeCacheTag(ce.getCacheTag(), conf);
     DiskRangeList ranges = decodeRanges(ce.getRangesList());
     Object fileKey = decodeFileKey(ce.getFileKey());
     try (LlapOrcCacheLoader llr = new LlapOrcCacheLoader(new 
Path(ce.getFilePath()), fileKey, conf, cache,
@@ -167,9 +168,16 @@ private static DiskRangeList 
decodeRanges(List<LlapDaemonProtocolProtos.CacheEnt
     return helper.get();
   }
 
-  private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct) 
{
-    return ct.getPartitionDescCount() == 0 ? CacheTag.build(ct.getTableName()) 
: CacheTag
-        .build(ct.getTableName(), ct.getPartitionDescList());
+  private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct, 
Configuration conf) {
+    String tableName = ct.getTableName();
+    String[] parts = tableName.split("\\.");
+    if (parts.length == 2) {
+      // db.table without catalog, prepend current or default catalog
+      tableName = HiveUtils.getCurrentCatalogOrDefault(conf) + '.' + tableName;
+    }
+    return ct.getPartitionDescCount() == 0
+        ? CacheTag.build(tableName)
+        : CacheTag.build(tableName, ct.getPartitionDescList());
   }
 
   @VisibleForTesting
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 22407d8a2e3..f26af3d9036 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -324,9 +324,11 @@ public long 
evictEntity(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRe
     if (LOG.isDebugEnabled()) {
       StringBuilder sb = new StringBuilder();
       sb.append(markedBytes).append(" bytes marked for eviction from LLAP 
cache buffers that belong to table(s): ");
-      for (String table : 
request.getEntities().get(request.getSingleDbName()).keySet()) {
-        sb.append(table).append(" ");
-      }
+      request.getEntities().forEach((catalogdb, tables) ->
+          tables.forEach((table, partitions) ->
+            
sb.append(catalogdb.catalog()).append(".").append(catalogdb.database())
+                .append(".").append(table).append(" "))
+      );
       sb.append(" Duration: ").append(time).append(" ms");
       LOG.debug(sb.toString());
     }
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 75a71560b81..76f95a3ec33 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, 
BufferUsageManager buff
     // LlapInputFormat needs to know the file schema to decide if schema 
evolution is supported.
     PartitionDesc partitionDesc = 
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
     cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
-        ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, 
partitionDesc) : null;
+        ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : 
null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache 
will fix all that
     fsSupplier = getFsSupplier(split.getPath(), jobConf);
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 3322136366d..e32b0584c88 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -225,7 +225,7 @@ public MemoryBuffer create() {
     PartitionDesc partitionDesc = 
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
     fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
     cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
-        ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, 
partitionDesc) : null;
+        ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : 
null;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceSerDe = sourceSerDe;
     this.reporter = reporter;
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 02ee55f250e..1c975b623d0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
@@ -140,7 +141,6 @@ public boolean isMarkedForEviction() {
 
   @Override
   public CacheTag getTag() {
-    // We don't care about these.
-    return CacheTag.build("OrcEstimates");
+    return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "OrcEstimates");
   }
 }
\ No newline at end of file
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 15d3f8fd157..08eb1f45d93 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.Map;
 
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -127,7 +126,7 @@ public void testCacheTagComparison() {
   public void testEncodingDecoding() throws Exception {
     LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
     partDescs.put("pytha=goras", "a2+b2=c2");
-    CacheTag tag = CacheTag.build("math.rules", partDescs);
+    CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"math.rules", partDescs);
     CacheTag.SinglePartitionCacheTag stag = 
((CacheTag.SinglePartitionCacheTag)tag);
     assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
     assertEquals(1, stag.getPartitionDescMap().size());
@@ -136,7 +135,7 @@ public void testEncodingDecoding() throws Exception {
     partDescs.clear();
     partDescs.put("mutli=one", "one=/1");
     partDescs.put("mutli=two/", "two=2");
-    tag = CacheTag.build("math.rules", partDescs);
+    tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "math.rules", 
partDescs);
     CacheTag.MultiPartitionCacheTag mtag = 
((CacheTag.MultiPartitionCacheTag)tag);
     assertEquals("mutli=one=one=/1/mutli=two/=two=2", 
mtag.partitionDescToString());
     assertEquals(2, mtag.getPartitionDescMap().size());
@@ -168,6 +167,10 @@ private static LlapCacheableBuffer createMockBuffer(long 
size, CacheTag cacheTag
   }
 
   public static CacheTag cacheTagBuilder(String dbAndTable, String... 
partitions) {
+    String[] parts = dbAndTable.split("\\.");
+    if(parts.length < 3) {
+      dbAndTable = Warehouse.DEFAULT_CATALOG_NAME + "." + dbAndTable;
+    }
     if (partitions != null && partitions.length > 0) {
       LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
       for (String partition : partitions) {
@@ -215,33 +218,33 @@ private static void evictSomeTestBuffers() {
   private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
       "\n" +
           "Cache state: \n" +
-          "default : 2/2, 2101248/2101248\n" +
-          "default.testtable : 2/2, 2101248/2101248\n" +
-          "otherdb : 7/7, 1611106304/1611106304\n" +
-          "otherdb.testtable : 4/4, 231424/231424\n" +
-          "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
-          "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
-          "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
-          "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
-          "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
-          "otherdb.testtable2 : 2/2, 537133056/537133056\n" +
-          "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
-          "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+          "hive.default : 2/2, 2101248/2101248\n" +
+          "hive.default.testtable : 2/2, 2101248/2101248\n" +
+          "hive.otherdb : 7/7, 1611106304/1611106304\n" +
+          "hive.otherdb.testtable : 4/4, 231424/231424\n" +
+          "hive.otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
+          "hive.otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
+          "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+          "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+          "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+          "hive.otherdb.testtable2 : 2/2, 537133056/537133056\n" +
+          "hive.otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
+          "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
 
   private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
       "\n" +
           "Cache state: \n" +
-          "default : 0/2, 0/2101248\n" +
-          "default.testtable : 0/2, 0/2101248\n" +
-          "otherdb : 5/7, 1074202624/1611106304\n" +
-          "otherdb.testtable : 3/4, 198656/231424\n" +
-          "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
-          "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
-          "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
-          "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
-          "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
-          "otherdb.testtable2 : 1/2, 262144/537133056\n" +
-          "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
-          "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+          "hive.default : 0/2, 0/2101248\n" +
+          "hive.default.testtable : 0/2, 0/2101248\n" +
+          "hive.otherdb : 5/7, 1074202624/1611106304\n" +
+          "hive.otherdb.testtable : 3/4, 198656/231424\n" +
+          "hive.otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
+          "hive.otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
+          "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+          "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+          "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+          "hive.otherdb.testtable2 : 1/2, 262144/537133056\n" +
+          "hive.otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
+          "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
 
 }
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
index 34203ddf5d6..f5bb1e0d254 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
@@ -19,6 +19,7 @@
 
 import com.google.common.base.Function;
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.junit.Test;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +33,7 @@ public void testFileCacheMetadata() {
     ConcurrentHashMap<Object, FileCache<Object>> cache = new 
ConcurrentHashMap<>();
     Object fileKey = 1234L;
     Function<Void, Object> f = a -> new Object();
-    CacheTag tag = CacheTag.build("test_table");
+    CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"test_db.test_table");
 
     FileCache<Object> result = FileCache.getOrAddFileSubCache(cache, fileKey, 
f, tag);
 
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 4e3c10ed6b2..764dd9ec319 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -39,6 +39,7 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -309,13 +310,14 @@ private void _testProactiveEvictionMark(boolean 
isInstantDeallocation) {
 
     LlapDataBuffer[] buffs1 = IntStream.range(0, 4).mapToObj(i -> 
fb()).toArray(LlapDataBuffer[]::new);
     DiskRange[] drs1 = drs(IntStream.range(1, 5).toArray());
-    CacheTag tag1 = CacheTag.build("default.table1");
+    CacheTag tag1 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"default.table1");
 
     LlapDataBuffer[] buffs2 = IntStream.range(0, 41).mapToObj(i -> 
fb()).toArray(LlapDataBuffer[]::new);
     DiskRange[] drs2 = drs(IntStream.range(1, 42).toArray());
-    CacheTag tag2 = CacheTag.build("default.table2");
+    CacheTag tag2 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"default.table2");
 
-    Predicate<CacheTag> predicate = tag -> 
"default.table1".equals(tag.getTableName());
+    Predicate<CacheTag> predicate = tag ->
+        (Warehouse.DEFAULT_CATALOG_NAME + "." + 
"default.table1").equals(tag.getTableName());
 
     cache.putFileData(fn1, drs1, buffs1, 0, Priority.NORMAL, null, tag1);
     cache.putFileData(fn2, drs2, buffs2, 0, Priority.NORMAL, null, tag2);
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 62a8c747439..5ec15beccfb 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -37,6 +37,7 @@
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
@@ -249,7 +250,7 @@ public void testGetOrcTailForPath() throws Exception {
     Path path = new Path("../data/files/alltypesorc");
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
-    CacheTag tag = CacheTag.build("test-table");
+    CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"test-db.test-table");
     OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache, null);
     jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
     OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, 
tag, daemonConf, cache, null);
@@ -270,7 +271,7 @@ public void testGetOrcTailForPathWithFileId() throws 
Exception {
     Path path = new Path("../data/files/alltypesorc");
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
-    CacheTag tag = CacheTag.build("test-table");
+    CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"test-db.test-table");
     FileSystem fs = FileSystem.get(daemonConf);
     FileStatus fileStatus = fs.getFileStatus(path);
     OrcTail uncached = 
OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, 
daemonConf, cache, new SyntheticFileId(fileStatus));
@@ -294,7 +295,7 @@ public void testGetOrcTailForPathWithFileIdChange() throws 
Exception {
     Path path = new Path("../data/files/alltypesorc");
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
-    CacheTag tag = CacheTag.build("test-table");
+    CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, 
"test-db.test-table");
     OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, 
cache, new SyntheticFileId(path, 100, 100));
     jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
     Exception ex = null;
@@ -337,19 +338,23 @@ public void testProactiveEvictionMark() throws Exception {
     // below is of length 65
     ByteBuffer bb2 = 
ByteBuffer.wrap("-large-meta-data-content-large-meta-data-content-large-meta-data-".getBytes());
 
-    LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb, 
CacheTag.build("default.table1"), isStopped);
+    LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb,
+        CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), 
isStopped);
     assertNotNull(table1Buffers1.getSingleLlapBuffer());
 
-    LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2, 
CacheTag.build("default.table1"), isStopped);
+    LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2,
+        CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), 
isStopped);
     assertNotNull(table1Buffers2.getMultipleLlapBuffers());
     assertEquals(2, table1Buffers2.getMultipleLlapBuffers().length);
 
     // Case for when metadata consists of just 1 buffer (most of the realworld 
cases)
     ByteBuffer bb3 = 
ByteBuffer.wrap("small-meta-data-content-for-otherFile".getBytes());
-    LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3, 
CacheTag.build("default.table2"), isStopped);
+    LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3,
+        CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2"), 
isStopped);
     assertNotNull(table2Buffers1.getSingleLlapBuffer());
 
-    Predicate<CacheTag> predicate = tag -> 
"default.table1".equals(tag.getTableName());
+    Predicate<CacheTag> predicate = tag ->
+        (Warehouse.DEFAULT_CATALOG_NAME + 
".default.table1").equals(tag.getTableName());
 
     // Simulating eviction on some buffers
     table1Buffers2.getMultipleLlapBuffers()[1].decRef();
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
index 89c8f605503..e75237b8765 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
@@ -22,6 +22,8 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -32,7 +34,9 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.ProactiveEviction.Request;
 import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EvictEntityRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.metastore.Warehouse;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -42,6 +46,7 @@
 import static 
org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -106,6 +111,328 @@ private static void assertMatchOnTags(Builder 
requestBuilder, String expected) {
     assertEquals(expected, sb.toString());
   }
 
+  /**
+   * Verifies that passing an explicit catalog produces correct matching via 
isTagMatch.
+   * TEST_TAGS all belong to the default catalog, so requests for a different 
catalog must not match.
+   */
+  @Test
+  public void testCatalogAwareCacheTagAndRequestMatching() {
+    // Default catalog matches as expected.
+    assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000");
+    assertMatchOnTags(Builder.create().addTable("fx", "futures"), 
"000001111000000000");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures",
+            buildParts("ccy", "JPY")), "000000110000000000");
+    assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"), 
"000000000000000110");
+    assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates",
+            buildParts("from", "EUR", "to", "HUF")), "000010000000000000");
+
+    // Non-default catalog: CacheTag now carries catalog info, so none of the 
TEST_TAGS
+    // (all default-catalog) should match requests targeting a different 
catalog.
+    assertMatchOnTags(Builder.create().addDb("custom_catalog", "fx"), 
"000000000000000000");
+    assertMatchOnTags(Builder.create().addTable("custom_catalog", "equity", 
"prices"),
+        "000000000000000000");
+    assertMatchOnTags(Builder.create().addPartitionOfATable(
+        "custom_catalog", "equity", "prices", buildParts("ex", "NYSE")),
+        "000000000000000000");
+  }
+
+  /**
+   * Verifies that catalog_name is serialized into the proto and correctly 
restored via fromProtoRequest.
+   */
+  @Test
+  public void testProtoRoundTripPreservesCatalog() {
+    // Default catalog is always serialized into the proto.
+    Request defaultCatRequest = Builder.create().addDb("testdb").build();
+    List<EvictEntityRequestProto> protos = defaultCatRequest.toProtoRequests();
+    assertEquals(1, protos.size());
+    EvictEntityRequestProto proto = protos.get(0);
+    assertEquals(Warehouse.DEFAULT_CATALOG_NAME, proto.getCatalogName());
+    assertEquals("testdb", proto.getDbName());
+
+    Request roundTripped = Builder.create().fromProtoRequest(proto).build();
+    assertTrue(roundTripped.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME, 
"testdb"));
+
+    // Custom catalog is also preserved.
+    Request customCatRequest = Builder.create().addTable("spark_catalog", 
"salesdb", "orders").build();
+    protos = customCatRequest.toProtoRequests();
+    assertEquals(1, protos.size());
+    proto = protos.get(0);
+    assertEquals("spark_catalog", proto.getCatalogName());
+    assertEquals("salesdb", proto.getDbName());
+
+    roundTripped = Builder.create().fromProtoRequest(proto).build();
+    assertTrue(roundTripped.hasDatabaseName("spark_catalog", "salesdb"));
+  }
+
+  /**
+   * Verifies that entities in different catalogs are independently scoped 
even when they share
+   * the same DB name.
+   */
+  @Test
+  public void testMultiCatalogBuilderScoping() {
+    // Two different catalogs, each with the same DB name but different tables.
+    Request request = Builder.create()
+        .addTable("catalog_a", "shared_db", "table_a")
+        .addTable("catalog_b", "shared_db", "table_b")
+        .build();
+
+    assertEquals(2, request.getEntities().size());
+    assertTrue(request.getEntities().containsKey(new 
Request.CatalogDb("catalog_a", "shared_db")));
+    assertTrue(request.getEntities().containsKey(new 
Request.CatalogDb("catalog_b", "shared_db")));
+
+    // catalog_a only knows about table_a.
+    assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_a", 
"shared_db")).containsKey("table_a"));
+    assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_a", 
"shared_db")).containsKey("table_b"));
+
+    // catalog_b only knows about table_b.
+    assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_b", 
"shared_db")).containsKey("table_b"));
+    assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_b", 
"shared_db")).containsKey("table_a"));
+  }
+
+  /**
+   * Verifies that multiple tables and partitions added to the same catalog+DB 
are merged
+   * into a single catalog entry (no duplication).
+   */
+  @Test
+  public void testSameCatalogMultipleEntitiesMergedCorrectly() {
+    Request request = Builder.create()
+        .addTable("mydb", "table1")
+        .addTable("mydb", "table2")
+        .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-01"))
+        .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-02"))
+        .build();
+
+    assertTrue(request.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME, 
"mydb"));
+    // One catalog, one DB, three tables.
+    assertEquals(1, request.getEntities().size());
+    assertEquals(3, request.getEntities()
+        .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME, 
"mydb")).size());
+    // table3 has two partition specs.
+    assertEquals(2, request.getEntities()
+        .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME, 
"mydb")).get("table3").size());
+  }
+
+  /**
+   * Verifies that CacheTag catalog information is correctly used to isolate 
eviction between catalogs.
+   * A request targeting catalog A must not evict buffers that belong to 
catalog B, even when the
+   * DB and table names are identical.
+   */
+  @Test
+  public void testCatalogIsolationInIsTagMatch() {
+    CacheTag defaultCatalogTag = cacheTagBuilder("fx.rates", "from=USD", 
"to=HUF");
+    CacheTag otherCatalogTag = cacheTagBuilder("other_catalog.fx.rates", 
"from=USD", "to=HUF");
+
+    // Request for the default catalog's "fx" DB matches only default-catalog 
tags.
+    Request defaultCatalogRequest = Builder.create()
+        .fromProtoRequest(Builder.create()
+            .addDb("fx")
+            .build().toProtoRequests().get(0))
+        .build();
+    assertTrue(defaultCatalogRequest.isTagMatch(defaultCatalogTag));
+    assertFalse("Must not evict buffers belonging to other_catalog",
+        defaultCatalogRequest.isTagMatch(otherCatalogTag));
+
+    // Request for a different catalog matches only tags from that catalog.
+    Request otherCatalogRequest = Builder.create()
+        .fromProtoRequest(Builder.create()
+            .addDb("other_catalog", "fx")
+            .build().toProtoRequests().get(0))
+        .build();
+    assertTrue(otherCatalogRequest.isTagMatch(otherCatalogTag));
+    assertFalse("Must not evict buffers belonging to the default catalog",
+        otherCatalogRequest.isTagMatch(defaultCatalogTag));
+
+    // A request for a DB that doesn't exist in the tags must not match, 
regardless of catalog.
+    Request noMatchRequest = Builder.create()
+        .fromProtoRequest(Builder.create()
+            .addDb("any_catalog", "nonexistent_db")
+            .build().toProtoRequests().get(0))
+        .build();
+    assertFalse(noMatchRequest.isTagMatch(defaultCatalogTag));
+    assertFalse(noMatchRequest.isTagMatch(otherCatalogTag));
+  }
+
+  /**
+   * Verifies that Iceberg metadata table cache tags 
(catalog.db.table.metaTable) are handled by
+   * isTagMatch and evicted when the base table is dropped.
+   */
+  @Test
+  public void testIcebergMetaTableTagMatching() {
+    CacheTag baseTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders");
+    CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders.files");
+    CacheTag snapshotsMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME 
+ ".salesdb.orders.snapshots");
+    CacheTag otherTableMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME 
+ ".salesdb.other.files");
+
+    Request dropTableRequest = Builder.create()
+        .fromProtoRequest(Builder.create()
+            .addTable("salesdb", "orders")
+            .build().toProtoRequests().get(0))
+        .build();
+
+    assertTrue(dropTableRequest.isTagMatch(baseTableTag));
+    assertTrue(dropTableRequest.isTagMatch(filesMetaTag));
+    assertTrue(dropTableRequest.isTagMatch(snapshotsMetaTag));
+    assertFalse(dropTableRequest.isTagMatch(otherTableMetaTag));
+
+    // Drop-partition requests must not evict metadata-table cache via prefix 
matching.
+    Request dropPartitionRequest = Builder.create()
+        .fromProtoRequest(Builder.create()
+            .addPartitionOfATable("salesdb", "orders", buildParts("dt", 
"2024-01-01"))
+            .build().toProtoRequests().get(0))
+        .build();
+    assertFalse(dropPartitionRequest.isTagMatch(filesMetaTag));
+  }
+
+  /**
+   * Legacy cache tags created before catalog support are 2-part (db.table) 
with no catalog
+   * component. They must be treated as belonging to the default catalog.
+   */
+  @Test
+  public void testTwoPartLegacyTagMatching() {
+    CacheTag tableTag = CacheTag.build("salesdb.orders");
+    CacheTag partitionedTag = CacheTag.build("salesdb.orders", 
buildParts("dt", "2024-01-01"));
+
+    // Drop database, table and matching partition (all default catalog) evict 
the legacy tags.
+    assertTrue(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"salesdb").isTagMatch(tableTag));
+    assertTrue(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", 
"orders").isTagMatch(tableTag));
+    assertTrue(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", 
"orders",
+        buildParts("dt", "2024-01-01")).isTagMatch(partitionedTag));
+
+    // A non-matching partition value must not evict.
+    assertFalse(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"salesdb", "orders",
+        buildParts("dt", "2024-01-02")).isTagMatch(partitionedTag));
+
+    // A request scoped to a custom catalog must not evict default-catalog 
legacy tags.
+    assertFalse(dropDbRequest("custom_catalog", 
"salesdb").isTagMatch(tableTag));
+    assertFalse(dropTableRequest("custom_catalog", "salesdb", 
"orders").isTagMatch(tableTag));
+  }
+
+  /**
+   * Iceberg metadata table tags (catalog.db.table.metaTable) on a non-default 
catalog must be
+   * evicted only by requests scoped to that same catalog.
+   */
+  @Test
+  public void testNonDefaultCatalogIcebergMetaTableMatching() {
+    CacheTag filesMetaTag = 
CacheTag.build("spark_catalog.salesdb.orders.files");
+    CacheTag snapshotsMetaTag = 
CacheTag.build("spark_catalog.salesdb.orders.snapshots");
+
+    // Drop table on the custom catalog evicts its metadata-table cache.
+    Request dropTable = dropTableRequest("spark_catalog", "salesdb", "orders");
+    assertTrue(dropTable.isTagMatch(filesMetaTag));
+    assertTrue(dropTable.isTagMatch(snapshotsMetaTag));
+
+    // Drop database on the custom catalog evicts metadata-table cache too.
+    Request dropDb = dropDbRequest("spark_catalog", "salesdb");
+    assertTrue(dropDb.isTagMatch(filesMetaTag));
+    assertTrue(dropDb.isTagMatch(snapshotsMetaTag));
+
+    // The same logical name in the default catalog must not be evicted.
+    assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", 
"orders")
+        .isTagMatch(filesMetaTag));
+    assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"salesdb").isTagMatch(filesMetaTag));
+  }
+
+  /**
+   * Dropping a database must evict both base-table and Iceberg metadata-table 
cache entries
+   * belonging to that database.
+   */
+  @Test
+  public void testDropDatabaseEvictsMetaTableTags() {
+    CacheTag baseTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders");
+    CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders.files");
+
+    Request dropDb = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb");
+    assertTrue(dropDb.isTagMatch(baseTag));
+    assertTrue(dropDb.isTagMatch(filesMetaTag));
+
+    // A database with a different name must not match.
+    assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"otherdb").isTagMatch(filesMetaTag));
+  }
+
+  /**
+   * Prefix matching used for Iceberg metadata tables must not produce false 
positives for tables
+   * that merely share a name prefix with the dropped table.
+   */
+  @Test
+  public void testDropTablePrefixMatchingAvoidsFalsePositives() {
+    CacheTag siblingTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders_archive");
+    CacheTag siblingMetaTag =
+        CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders_archive.files");
+
+    Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"salesdb", "orders");
+    assertFalse(dropTable.isTagMatch(siblingTableTag));
+    assertFalse(dropTable.isTagMatch(siblingMetaTag));
+  }
+
+  /**
+   * With catalog-aware tags a 3-part name is always interpreted as 
catalog.db.table, never as a
+   * default-catalog db.table.metaTable.
+   */
+  @Test
+  public void testThreePartTagInterpretedAsCatalogQualified() {
+    CacheTag tag = CacheTag.build("custom_catalog.salesdb.orders");
+
+    // Matched when the request targets the same catalog + db + table.
+    assertTrue(dropTableRequest("custom_catalog", "salesdb", 
"orders").isTagMatch(tag));
+
+    // Not matched when "custom_catalog" is mistaken for a database in the 
default catalog.
+    assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"custom_catalog", "salesdb")
+        .isTagMatch(tag));
+  }
+
+  /**
+   * Snapshot-ref tags (branch_/tag_) must be evicted when their base table is 
dropped, both in the
+   * 4-part catalog-qualified form and in the legacy 3-part db.table.ref form.
+   */
+  @Test
+  public void testSnapshotRefTagMatching() {
+    CacheTag branchTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders.branch_main");
+    CacheTag tagRefTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + 
".salesdb.orders.tag_v1");
+    // Legacy 3-part snapshot ref without catalog prefix -> default-catalog 
db.table.ref.
+    CacheTag legacyBranchTag = CacheTag.build("salesdb.orders.branch_main");
+
+    Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, 
"salesdb", "orders");
+    assertTrue(dropTable.isTagMatch(branchTag));
+    assertTrue(dropTable.isTagMatch(tagRefTag));
+    assertTrue(dropTable.isTagMatch(legacyBranchTag));
+  }
+
+  /**
+   * Cache tag names must have between 2 and 4 dot-separated components; 
anything else is rejected.
+   */
+  @Test
+  public void testInvalidCacheTagLengthThrows() {
+    Request request = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb");
+
+    // Single-component tag is not a valid db-qualified name.
+    assertThrows(UnsupportedOperationException.class,
+        () -> request.isTagMatch(CacheTag.build("orders")));
+
+    // Five-component tag exceeds the supported catalog.db.table.metaTable 
structure.
+    assertThrows(UnsupportedOperationException.class,
+        () -> request.isTagMatch(CacheTag.build("a.b.c.d.e")));
+  }
+
+  private static Request dropDbRequest(String catalog, String db) {
+    return roundTrip(Builder.create().addDb(catalog, db));
+  }
+
+  private static Request dropTableRequest(String catalog, String db, String 
table) {
+    return roundTrip(Builder.create().addTable(catalog, db, table));
+  }
+
+  private static Request dropPartitionRequest(String catalog, String db, 
String table,
+      Map<String, String> partSpec) {
+    return roundTrip(Builder.create().addPartitionOfATable(catalog, db, table, 
partSpec));
+  }
+
+  /**
+   * Marshals the request to proto and back, mirroring how the LLAP daemon 
receives requests.
+   */
+  private static Request roundTrip(Builder requestBuilder) {
+    return 
Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build();
+  }
+
   @Test
   public void testProactiveSweep() throws Exception {
     closeSweeperExecutorForTest();
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
index 01581dd94a7..6082f57d065 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
@@ -107,7 +107,7 @@ private LlapDaemonProtocolProtos.CacheEntryList 
createDummyMetadata() throws IOE
     LlapDaemonProtocolProtos.CacheEntryRange re2 =
         
LlapDaemonProtocolProtos.CacheEntryRange.newBuilder().setStart(14L).setEnd(38L).build();
     LlapDaemonProtocolProtos.CacheTag ct =
-        
LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("dummyTable").build();
+        
LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("hive.default.dummyTable").build();
 
     Path path = new Path(TEST_PATH);
     SyntheticFileId syntheticFileId = fileId(path);
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index ba62b8d89c2..76034ca69a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
@@ -74,18 +75,21 @@ public static PartitionDesc partitionDescForPath(Path path, 
Map<Path, PartitionD
     return part;
   }
 
-  public static CacheTag getDbAndTableNameForMetrics(Path path, boolean 
includeParts,
-      PartitionDesc part) {
-
-    // Fallback to legacy cache tag creation logic.
+  /**
+   * Builds a {@link CacheTag} for the given path and partition descriptor.
+   * The catalog name is derived from the {@link PartitionDesc} when 
available, falling back
+   * to {@link Warehouse#DEFAULT_CATALOG_NAME} when {@code part} is null.
+   */
+  public static CacheTag getCacheTag(Path path, boolean includeParts, 
PartitionDesc part) {
     if (part == null) {
-      return CacheTag.build(LlapUtil.getDbAndTableNameForMetrics(path, 
includeParts));
+      return CacheTag.build(
+          Warehouse.DEFAULT_CATALOG_NAME, 
LlapUtil.getDbAndTableNameForMetrics(path, includeParts));
     }
-
+    String catalogName = part.getCatalogName();
     if (!includeParts || !part.isPartitioned()) {
-      return CacheTag.build(part.getTableName());
+      return CacheTag.build(catalogName, part.getTableName());
     } else {
-      return CacheTag.build(part.getTableName(), part.getPartSpec());
+      return CacheTag.build(catalogName, part.getTableName(), 
part.getPartSpec());
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java 
b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
index 120949fc994..b1fcf31a28f 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
@@ -21,9 +21,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +38,7 @@
 import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
@@ -139,7 +138,8 @@ public void run() {
 
         long evictedBytes = 0;
         for (LlapDaemonProtocolProtos.EvictEntityRequestProto protoRequest : 
protoRequests) {
-          LOG.debug("Requesting proactive eviction for entities in database 
{}", protoRequest.getDbName());
+          LOG.debug("Requesting proactive eviction for entities in catalog {}, 
database {}",
+              protoRequest.getCatalogName(), protoRequest.getDbName());
           LlapDaemonProtocolProtos.EvictEntityResponseProto response = 
client.evictEntity(null, protoRequest);
           evictedBytes += response.getEvictedBytes();
           LOG.debug("Proactively evicted {} bytes", 
response.getEvictedBytes());
@@ -152,19 +152,19 @@ public void run() {
   }
 
   /**
-   * Holds information on entities: DB name(s), table name(s), partitions.
+   * Holds information on entities: catalog name(s), DB name(s), table 
name(s), partitions.
    */
   public static final class Request {
 
-    // Holds a hierarchical structure of DBs, tables and partitions such as:
-    // { testdb : { testtab0 : [], testtab1 : [ {pk0 : p0v0, pk1 : p0v1}, {pk0 
: p1v0, pk1 : p1v1} ] }, testdb2 : {} }
-    private final Map<String, Map<String, Set<LinkedHashMap<String, String>>>> 
entities;
+    public record PartitionSpec(Map<String, String> spec) {}
+    public record CatalogDb(String catalog, String database){}
+    private final Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities;
 
-    private Request(Map<String, Map<String, Set<LinkedHashMap<String, 
String>>>> entities) {
+    private Request(Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities) {
       this.entities = entities;
     }
 
-    public Map<String, Map<String, Set<LinkedHashMap<String, String>>>> 
getEntities() {
+    public Map<CatalogDb, Map<String, Set<PartitionSpec>>> getEntities() {
       return entities;
     }
 
@@ -172,15 +172,8 @@ public boolean isEmpty() {
       return entities.isEmpty();
     }
 
-    /**
-     * Request often times only contains tables/partitions of 1 DB only.
-     * @return the single DB name, null if the count of DBs present is not 
exactly 1.
-     */
-    public String getSingleDbName() {
-      if (entities.size() == 1) {
-        return entities.keySet().stream().findFirst().get();
-      }
-      return null;
+    public boolean hasDatabaseName(String catalogName, String dbName) {
+      return entities.containsKey(new CatalogDb(catalogName, dbName));
     }
 
     /**
@@ -188,41 +181,39 @@ public String getSingleDbName() {
      * @return list of request instances ready to be sent over protobuf.
      */
     public List<LlapDaemonProtocolProtos.EvictEntityRequestProto> 
toProtoRequests() {
-
-      List<LlapDaemonProtocolProtos.EvictEntityRequestProto> protoRequests = 
new LinkedList<>();
-
-      for (Map.Entry<String, Map<String, Set<LinkedHashMap<String, String>>>> 
dbEntry : entities.entrySet()) {
-        String dbName = dbEntry.getKey();
-        Map<String, Set<LinkedHashMap<String, String>>> tables = 
dbEntry.getValue();
-
-        LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder 
requestBuilder =
-            LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder();
-        LlapDaemonProtocolProtos.TableProto.Builder tableBuilder = null;
-
-        requestBuilder.setDbName(dbName.toLowerCase());
-        for (Map.Entry<String, Set<LinkedHashMap<String, String>>> tableEntry 
: tables.entrySet()) {
-          String tableName = tableEntry.getKey();
-          tableBuilder = LlapDaemonProtocolProtos.TableProto.newBuilder();
-          tableBuilder.setTableName(tableName.toLowerCase());
-
-          Set<LinkedHashMap<String, String>> partitions = 
tableEntry.getValue();
-          Set<String> partitionKeys = null;
-
-          for (Map<String, String> partitionSpec : partitions) {
-            if (partitionKeys == null) {
+      return entities.entrySet().stream()
+          .map(entry -> {
+            CatalogDb catalogDb = entry.getKey();
+            Map<String, Set<PartitionSpec>> tables = entry.getValue();
+            LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder 
requestBuilder =
+                LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder();
+
+            requestBuilder.setCatalogName(catalogDb.catalog().toLowerCase());
+            requestBuilder.setDbName(catalogDb.database().toLowerCase());
+
+            tables.forEach((tableName, partitions) -> {
+              LlapDaemonProtocolProtos.TableProto.Builder tableBuilder =
+                  LlapDaemonProtocolProtos.TableProto.newBuilder();
+
+              tableBuilder.setTableName(tableName.toLowerCase());
+
+              Set<String> partitionKeys = null;
+
+              for (PartitionSpec partitionSpec : partitions) {
+                if (partitionKeys == null) {
+                  partitionKeys = new 
LinkedHashSet<>(partitionSpec.spec().keySet());
+                  tableBuilder.addAllPartKey(partitionKeys);
+                }
+                for (String partKey : tableBuilder.getPartKeyList()) {
+                  tableBuilder.addPartVal(partitionSpec.spec().get(partKey));
+                }
+              }
               // For a given table the set of partition columns (keys) should 
not change.
-              partitionKeys = new LinkedHashSet<>(partitionSpec.keySet());
-              tableBuilder.addAllPartKey(partitionKeys);
-            }
-            for (String partKey : tableBuilder.getPartKeyList()) {
-              tableBuilder.addPartVal(partitionSpec.get(partKey));
-            }
-          }
-          requestBuilder.addTable(tableBuilder.build());
-        }
-        protoRequests.add(requestBuilder.build());
-      }
-      return protoRequests;
+              requestBuilder.addTable(tableBuilder.build());
+            });
+            return requestBuilder.build();
+          })
+          .toList();
     }
 
     /**
@@ -233,19 +224,19 @@ public 
List<LlapDaemonProtocolProtos.EvictEntityRequestProto> toProtoRequests()
      * @return true if cacheTag matches and the related buffer is eligible for 
proactive eviction, false otherwise.
      */
     public boolean isTagMatch(CacheTag cacheTag) {
-      String db = getSingleDbName();
-      if (db == null) {
-        // Number of DBs in the request was not exactly 1.
-        throw new UnsupportedOperationException("Predicate only implemented 
for 1 DB case.");
-      }
-      TableName tagTableName = TableName.fromString(cacheTag.getTableName(), 
null, null);
-
-      // Check against DB.
-      if (!db.equals(tagTableName.getDb())) {
+      // Parse the tag once and derive catalog/db from the parsed result, so 
that 2-part
+      // (db.table), 3-part (catalog.db.table), 4-part 
(catalog.db.table.metaTable) and
+      // snapshot-ref names are all interpreted consistently.
+      TableName tagTableName = parseCacheTagTableName(cacheTag.getTableName());
+      String catalog = tagTableName.getCat();
+      String db = tagTableName.getDb();
+
+      // Check that the tag's catalog and database is present in the eviction 
request.
+      if (!entities.containsKey(new CatalogDb(catalog, db))) {
         return false;
       }
 
-      Map<String, Set<LinkedHashMap<String, String>>> tables = 
entities.get(db);
+      Map<String, Set<PartitionSpec>> tables = entities.getOrDefault(new 
CatalogDb(catalog, db), Map.of());
 
       // If true, must be a drop DB event and this cacheTag matches.
       if (tables.isEmpty()) {
@@ -257,31 +248,60 @@ public boolean isTagMatch(CacheTag cacheTag) {
         tagPartDescMap = ((CacheTag.PartitionCacheTag) 
cacheTag).getPartitionDescMap();
       }
 
+      String tagDbTable = tagTableName.getNotEmptyDbTable();
       // Check against table name.
-      for (String tableAndDbName : tables.keySet()) {
-        if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) {
-
-          Set<LinkedHashMap<String, String>> partDescs = 
tables.get(tableAndDbName);
-
-          // If true, must be a drop table event, and this cacheTag matches.
-          if (partDescs == null) {
-            return true;
+      for (Map.Entry<String, Set<PartitionSpec>> tableEntry : 
tables.entrySet()) {
+        String tableAndDbName = tableEntry.getKey();
+        Set<PartitionSpec> partDescs = tableEntry.getValue();
+        if (!tableAndDbName.equals(tagDbTable)) {
+          // Drop-table requests use db.table; Iceberg metadata tables are 
tagged db.table.metaTable.
+          if (partDescs != null || !tagDbTable.startsWith(tableAndDbName + 
".")) {
+            continue;
           }
+          return true;
+        }
 
-          // Check against partition keys and values and alas for drop 
partition event.
-          if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
-            throw new IllegalArgumentException("CacheTag has no partition 
information, while trying" +
-                " to evict due to (and based on) a drop partition DDL 
statement..");
-          }
+        // If true, must be a drop table event, and this cacheTag matches.
+        if (partDescs == null) {
+          return true;
+        }
 
-          if (partDescs.contains(tagPartDescMap)) {
-            return true;
-          }
+        // Check against partition keys and values and alas for drop partition 
event.
+        if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
+          throw new IllegalArgumentException("CacheTag has no partition 
information, while trying" +
+              " to evict due to (and based on) a drop partition DDL 
statement..");
+        }
+
+        if (partDescs.contains(new PartitionSpec(tagPartDescMap))) {
+          return true;
         }
       }
       return false;
     }
 
+    /**
+     * Parses a cache-tag table name into a {@link TableName}. Supports legacy 
{@code db.table},
+     * catalog-qualified {@code catalog.db.table}, and Iceberg metadata tables
+     * {@code catalog.db.table.metaTable}.
+     */
+    private static TableName parseCacheTagTableName(String fullTableName) {
+      String[] names = fullTableName.split("\\.");
+      switch (names.length) {
+        case 2:
+          return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0], 
names[1], null);
+        case 3:
+          if (TableName.SNAPSHOT_REF.matcher(names[2]).matches()) {
+            return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0], 
names[1], names[2]);
+          }
+          return new TableName(names[0], names[1], names[2], null);
+        case 4:
+          return new TableName(names[0], names[1], names[2], names[3]);
+        default:
+          throw new UnsupportedOperationException(
+              "Cache tag table name must have 2-4 dot-separated components: " 
+ fullTableName);
+      }
+    }
+
     @Override
     public String toString() {
       return "Request { entities = " + entities + " }";
@@ -292,7 +312,7 @@ public String toString() {
      */
     public static final class Builder {
 
-      private final Map<String, Map<String, Set<LinkedHashMap<String, 
String>>>> entities;
+      private final Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities;
 
       private Builder() {
         this.entities = new HashMap<>();
@@ -302,45 +322,64 @@ public static Builder create() {
         return new Builder();
       }
 
-      public Builder addPartitionOfATable(String db, String tableName, 
LinkedHashMap<String, String> partSpec) {
-        ensureDb(db);
-        ensureTable(db, tableName);
-        entities.get(db).get(tableName).add(partSpec);
+      /**
+       * Add a partition of a table scoped to the given catalog.
+       */
+      public Builder addPartitionOfATable(String catalog, String db, String 
tableName,
+                                          Map<String, String> partSpec) {
+        ensureTable(catalog, db, tableName);
+        entities.get(new CatalogDb(catalog, db)).get(tableName).add(new 
PartitionSpec(partSpec));
         return this;
       }
 
+      /**
+       * Add a partition of a table scoped to the default catalog.
+       */
+      public Builder addPartitionOfATable(String db, String tableName, 
Map<String, String> partSpec) {
+        return addPartitionOfATable(Warehouse.DEFAULT_CATALOG_NAME, db, 
tableName, partSpec);
+      }
+
+      /**
+       * Add a database scoped to the given catalog.
+       */
+      public Builder addDb(String catalog, String db) {
+        ensureDb(catalog, db);
+        return this;
+      }
+
+      /**
+       * Add a database scoped to the default catalog.
+       */
       public Builder addDb(String db) {
-        ensureDb(db);
+        return addDb(Warehouse.DEFAULT_CATALOG_NAME, db);
+      }
+
+      /**
+       * Add a table scoped to the given catalog.
+       */
+      public Builder addTable(String catalog, String db, String table) {
+        ensureTable(catalog, db, table);
         return this;
       }
 
+      /**
+       * Add a table scoped to the default catalog.
+       */
       public Builder addTable(String db, String table) {
-        ensureDb(db);
-        ensureTable(db, table);
-        return this;
+        return addTable(Warehouse.DEFAULT_CATALOG_NAME, db, table);
       }
 
       public Request build() {
         return new Request(entities);
       }
 
-      private void ensureDb(String dbName) {
-        Map<String, Set<LinkedHashMap<String, String>>> tables = 
entities.get(dbName);
-        if (tables == null) {
-          tables = new HashMap<>();
-          entities.put(dbName, tables);
-        }
+      private void ensureDb(String catalogName, String dbName) {
+        entities.computeIfAbsent(new CatalogDb(catalogName, dbName), k -> new 
HashMap<>());
       }
 
-      private void ensureTable(String dbName, String tableName) {
-        ensureDb(dbName);
-        Map<String, Set<LinkedHashMap<String, String>>> tables = 
entities.get(dbName);
-
-        Set<LinkedHashMap<String, String>> partitions = tables.get(tableName);
-        if (partitions == null) {
-          partitions = new HashSet<>();
-          tables.put(tableName, partitions);
-        }
+      private void ensureTable(String catalogName, String dbName, String 
tableName) {
+        ensureDb(catalogName, dbName);
+        entities.get(new CatalogDb(catalogName, 
dbName)).computeIfAbsent(tableName, k -> new HashSet<>());
       }
 
       /**
@@ -350,9 +389,10 @@ private void ensureTable(String dbName, String tableName) {
        */
       public Builder 
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRequest) 
{
         entities.clear();
+        String catalogName = protoRequest.getCatalogName().toLowerCase();
         String dbName = protoRequest.getDbName().toLowerCase();
 
-        Map<String, Set<LinkedHashMap<String, String>>> entitiesInDb = new 
HashMap<>();
+        Map<String, Set<PartitionSpec>> entitiesInDb = new HashMap<>();
         List<LlapDaemonProtocolProtos.TableProto> tables = 
protoRequest.getTableList();
 
         if (tables != null && !tables.isEmpty()) {
@@ -364,8 +404,8 @@ public Builder 
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto
               entitiesInDb.put(dbAndTableName, null);
               continue;
             }
-            Set<LinkedHashMap<String, String>> partitions = new HashSet<>();
-            LinkedHashMap<String, String> partDesc = new LinkedHashMap<>();
+            Set<PartitionSpec> partitions = new HashSet<>();
+            Map<String, String> partDesc = new HashMap<>();
 
             for (int valIx = 0; valIx < table.getPartValCount(); ++valIx) {
               int keyIx = valIx % table.getPartKeyCount();
@@ -373,15 +413,15 @@ public Builder 
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto
               partDesc.put(table.getPartKey(keyIx).toLowerCase(), 
table.getPartVal(valIx));
 
               if (keyIx == table.getPartKeyCount() - 1) {
-                partitions.add(partDesc);
-                partDesc = new LinkedHashMap<>();
+                partitions.add(new PartitionSpec(partDesc));
+                partDesc = new HashMap<>();
               }
             }
 
             entitiesInDb.put(dbAndTableName, partitions);
           }
         }
-        entities.put(dbName, entitiesInDb);
+        entities.put(new CatalogDb(catalogName, dbName), entitiesInDb);
         return this;
       }
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
index 11f8917334b..256b3456ff7 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
@@ -67,6 +67,9 @@ public void analyzeInternal(ASTNode root) throws 
SemanticException {
     if (database == null) {
       return;
     }
+    if (catalogName == null) {
+      catalogName = database.getCatalogName();
+    }
     // if cascade=true, then we need to authorize the drop table action as 
well, and add the tables to the outputs
     boolean isDbLevelLock = true;
     if (cascade) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
index b544b7b4a24..fe4d153f1d2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
@@ -53,7 +53,7 @@ public int execute() throws HiveException {
 
       if (LlapHiveUtils.isLlapMode(context.getConf())) {
         ProactiveEviction.Request.Builder llapEvictRequestBuilder = 
ProactiveEviction.Request.Builder.create();
-        llapEvictRequestBuilder.addDb(dbName); // TODO catalog. add catalog 
for the cache. Depend on HIVE-29281
+        llapEvictRequestBuilder.addDb(catName, dbName);
         ProactiveEviction.evict(context.getConf(), 
llapEvictRequestBuilder.build());
       }
       // Unregister the functions as well
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
index b253ec5df5f..7e6c93ebdca 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
@@ -115,7 +115,7 @@ public int execute() throws HiveException {
     if (LlapHiveUtils.isLlapMode(context.getConf())) {
       TableName tableName = HiveTableName.of(table);
       ProactiveEviction.Request.Builder llapEvictRequestBuilder = 
ProactiveEviction.Request.Builder.create();
-      llapEvictRequestBuilder.addTable(tableName.getDb(), 
tableName.getTable());
+      llapEvictRequestBuilder.addTable(table.getCatName(), tableName.getDb(), 
tableName.getTable());
       ProactiveEviction.evict(context.getConf(), 
llapEvictRequestBuilder.build());
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
index a0eda1ab4ef..89ad090efb5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
@@ -147,7 +147,8 @@ private void dropPartitions(boolean isRepl) throws 
HiveException {
       DDLUtils.addIfAbsentByName(new WriteEntity(partition, 
WriteEntity.WriteType.DDL_NO_LOCK), context);
 
       if (llapEvictRequestBuilder != null) {
-        llapEvictRequestBuilder.addPartitionOfATable(tableName.getDb(), 
tableName.getTable(), partition.getSpec());
+        llapEvictRequestBuilder.addPartitionOfATable(
+            tableName.getCat(), tableName.getDb(), tableName.getTable(), 
partition.getSpec());
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a29e532b113..f58fafc8556 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -770,8 +770,7 @@ public static TableDesc getTableDesc(Table tbl) {
     if (tbl.getSnapshotRef() != null) {
       props.put(SNAPSHOT_REF, tbl.getSnapshotRef());
     }
-    return (new TableDesc(tbl.getInputFormatClass(), tbl
-        .getOutputFormatClass(), props));
+    return new TableDesc(tbl.getInputFormatClass(), 
tbl.getOutputFormatClass(), props, tbl.getCatName());
   }
 
   // column names and column types are all delimited by comma
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index d46eded36e6..9248897bf74 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -275,7 +275,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, 
OrcSplit orcSplit, Reporte
       }
       PartitionDesc partitionDesc =
           LlapHiveUtils.partitionDescForPath(orcSplit.getPath(), 
mapWork.getPathToPartitionInfo());
-      cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(orcSplit.getPath(), 
true, partitionDesc);
+      cacheTag = LlapHiveUtils.getCacheTag(orcSplit.getPath(), true, 
partitionDesc);
     } else {
       cacheTag = null;
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index fbffab44a64..c346331325d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -343,7 +343,7 @@ public static CacheTag cacheTagOfParquetFile(Path path, 
Configuration cacheConf,
       return null;
     }
     PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, 
mapWork.getPathToPartitionInfo());
-    return LlapHiveUtils.getDbAndTableNameForMetrics(path, true, 
partitionDesc);
+    return LlapHiveUtils.getCacheTag(path, true, partitionDesc);
   }
 
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 0dcfe72d7f5..25f8afe6d6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -258,6 +258,13 @@ public String getTableName() {
     return tableName;
   }
 
+  /**
+   * Returns the catalog name for this partition's table.
+   */
+  public String getCatalogName() {
+    return tableDesc.getCatalogName();
+  }
+
   @Explain(displayName = "input format", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
   public String getInputFileFormatClassName() {
     return getInputFileFormatClass().getName();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 58ce207c0c6..b0e6feff7ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -52,6 +53,8 @@ public class TableDesc implements Serializable, Cloneable {
   public static final String SECRET_PREFIX = "TABLE_SECRET";
   public static final String SECRET_DELIMIT = "#";
 
+  private String catalogName;
+
   public TableDesc() {
   }
 
@@ -59,14 +62,31 @@ public TableDesc() {
    * @param inputFormatClass
    * @param outputFormatClass
    * @param properties must contain serde class name associate with this table.
+   * @param catalogName the catalog this table belongs to; stored as a 
dedicated field so it does
+   *                    not appear in EXPLAIN output. Pass {@code null} for 
internal/intermediate
+   *                    descriptors that are not backed by a real user table; 
{@code null} will be
+   *                    normalized to {@link Warehouse#DEFAULT_CATALOG_NAME}.
    */
   public TableDesc(
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<?> outputFormatClass, final Properties properties) {
+      final Class<?> outputFormatClass, final Properties properties,
+      final String catalogName) {
     this.inputFileFormatClass = inputFormatClass;
     outputFileFormatClass = HiveFileFormatUtils
         .getOutputFormatSubstitute(outputFormatClass);
     setProperties(properties);
+    this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : 
catalogName;
+  }
+
+  /**
+   * @param inputFormatClass
+   * @param outputFormatClass
+   * @param properties must contain serde class name associate with this table.
+   */
+  public TableDesc(
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<?> outputFormatClass, final Properties properties) {
+    this(inputFormatClass, outputFormatClass, properties, null);
   }
 
   public Class<? extends AbstractSerDe> getSerDeClass() {
@@ -199,6 +219,14 @@ public int getBucketingVersion() {
         
properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION));
   }
 
+  public String getCatalogName() {
+    return catalogName;
+  }
+
+  public void setCatalogName(String catalogName) {
+    this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : 
catalogName;
+  }
+
   @Override
   public Object clone() {
     TableDesc ret = new TableDesc();
@@ -215,6 +243,7 @@ public Object clone() {
     if (jobProperties != null) {
       ret.jobProperties = new LinkedHashMap<String, String>(jobProperties);
     }
+    ret.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : 
catalogName;
     return ret;
   }
 
diff --git a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out 
b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
index 257d69751fd..3dc0c6bbc39 100644
--- a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
+++ b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
@@ -159,6 +159,7 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-0
     Drop Database
+      catalog: hive
       database: d
       if exists: false
 
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java 
b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index 0f5d7b91516..66ef19e1d35 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -30,17 +30,19 @@
 /**
  * Used for identifying the related object of the buffer stored in cache.
  * Comes in 3 flavours to optimize for minimal memory overhead:
- * - TableCacheTag for tables without partitions: DB/table level
- * - SinglePartitionCacheTag for tables with 1 partition level: 
DB/table/1st_partition
+ * - TableCacheTag for tables without partitions: catalog.DB.table level
+ * - SinglePartitionCacheTag for tables with 1 partition level: 
catalog.DB.table/1st_partition
  * - MultiPartitionCacheTag for tables with &gt; 1 partition levels:
- *     DB/table/1st_partition/.../nth_partition .
+ *     catalog.DB.table/1st_partition/.../nth_partition .
  */
 public abstract class CacheTag implements Comparable<CacheTag> {
 
   private static final String ENCODING = "UTF-8";
 
   /**
-   * Prepended by DB name and '.' .
+   * Catalog-qualified, DB-qualified table name.  Stored as {@code 
catalog.db.table}, e.g.
+   * {@code hive.salesdb.orders}.  For DB-level parent tags produced by
+   * {@link #createParentCacheTag} this is just {@code catalog.db}.
    */
   protected final String tableName;
 
@@ -48,6 +50,9 @@ private CacheTag(String tableName) {
     this.tableName = tableName.intern();
   }
 
+  /**
+   * Returns the full catalog-qualified, DB-qualified name, i.e. {@code 
catalog.db.table}.
+   */
   public String getTableName() {
     return tableName;
   }
@@ -71,8 +76,7 @@ public boolean equals(Object obj) {
 
   @Override
   public int hashCode() {
-    int res = tableName.hashCode();
-    return res;
+    return tableName.hashCode();
   }
 
   public static final CacheTag build(String tableName) {
@@ -82,8 +86,16 @@ public static final CacheTag build(String tableName) {
     return new TableCacheTag(tableName);
   }
 
-  public static final CacheTag build(String tableName, LinkedHashMap<String, 
String> partDescMap) {
-    if (StringUtils.isEmpty(tableName) || partDescMap == null || 
partDescMap.isEmpty()) {
+  public static final CacheTag build(String catalogName, String 
dbAndTableName) {
+    return build(catalogName + "." + dbAndTableName);
+  }
+
+  public static final CacheTag build(String catalogName, String 
dbAndTableName, LinkedHashMap<String, String> partDescMap) {
+    return build(catalogName + "." + dbAndTableName, partDescMap);
+  }
+
+  public static final CacheTag build(String fullTableName, 
LinkedHashMap<String, String> partDescMap) {
+    if (StringUtils.isEmpty(fullTableName) || partDescMap == null || 
partDescMap.isEmpty()) {
       throw new IllegalArgumentException();
     }
 
@@ -95,10 +107,10 @@ public static final CacheTag build(String tableName, 
LinkedHashMap<String, Strin
     }
 
     if (partDescs.length == 1) {
-      return new SinglePartitionCacheTag(tableName, partDescs[0]);
+      return new SinglePartitionCacheTag(fullTableName, partDescs[0]);
     } else {
       // In this case it must be >1
-      return new MultiPartitionCacheTag(tableName, partDescs);
+      return new MultiPartitionCacheTag(fullTableName, partDescs);
     }
   }
 
@@ -118,7 +130,10 @@ public static final CacheTag build(String tableName, 
List<String> partDescs) {
   /**
    * Constructs a (fake) parent CacheTag instance by walking back in the 
hierarchy i.e. stepping
    * from inner to outer partition levels, then producing a CacheTag for the 
table and finally
-   * the DB.
+   * the DB.  The catalog prefix is preserved throughout the walk.
+   *
+   * <p>The walk terminates at the DB level: a tag whose {@code tableName} 
contains exactly one
+   * dot (i.e. {@code catalog.db}) has no parent, so {@code null} is returned.
    */
   public static final CacheTag createParentCacheTag(CacheTag tag) {
     if (tag == null) {
@@ -134,20 +149,18 @@ public static final CacheTag 
createParentCacheTag(CacheTag tag) {
         }
         return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, 
subList);
       } else {
-        return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
-            multiPartitionCacheTag.partitionDesc[0]);
+        return new SinglePartitionCacheTag(
+            multiPartitionCacheTag.tableName, 
multiPartitionCacheTag.partitionDesc[0]);
       }
     }
 
     if (tag instanceof SinglePartitionCacheTag) {
       return new TableCacheTag(tag.tableName);
     } else {
-      // DB level
-      int ix = tag.tableName.indexOf(".");
-      if (ix <= 0) {
+      if (tag.tableName.split("\\.", 3).length < 3) {
         return null;
       }
-      return new TableCacheTag(tag.tableName.substring(0, ix));
+      return new TableCacheTag(tag.tableName.substring(0, 
tag.tableName.lastIndexOf('.')));
     }
 
   }
@@ -381,4 +394,3 @@ private static String[] decodePartDesc(String partDesc) {
   }
 
 }
-

Reply via email to