This is an automated email from the ASF dual-hosted git repository.
deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4a6187072 HIVE-29281: Make proactive cache eviction work with
catalog (#6379)
4c4a6187072 is described below
commit 4c4a6187072ff2cd90c4dc4962a308200e6fdb66
Author: Neeraj Khatri <[email protected]>
AuthorDate: Mon Jun 15 17:45:07 2026 +0530
HIVE-29281: Make proactive cache eviction work with catalog (#6379)
---
.../apache/iceberg/orc/VectorizedReadUtils.java | 2 +-
llap-common/src/protobuf/LlapDaemonProtocol.proto | 3 +-
.../io/api/impl/LlapCacheMetadataSerializer.java | 16 +-
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 8 +-
.../hive/llap/io/encoded/OrcEncodedDataReader.java | 2 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 2 +-
.../llap/io/metadata/OrcFileEstimateErrors.java | 4 +-
.../hive/llap/cache/TestCacheContentsTracker.java | 59 ++--
.../hadoop/hive/llap/cache/TestFileCache.java | 3 +-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 8 +-
.../hive/llap/cache/TestOrcMetadataCache.java | 19 +-
.../hive/llap/cache/TestProactiveEviction.java | 327 +++++++++++++++++++++
.../api/impl/TestLlapCacheMetadataSerializer.java | 2 +-
.../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 20 +-
.../apache/hadoop/hive/llap/ProactiveEviction.java | 256 +++++++++-------
.../ql/ddl/database/drop/DropDatabaseAnalyzer.java | 3 +
.../ddl/database/drop/DropDatabaseOperation.java | 2 +-
.../hive/ql/ddl/table/drop/DropTableOperation.java | 2 +-
.../drop/AlterTableDropPartitionOperation.java | 3 +-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 3 +-
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 2 +-
.../vector/VectorizedParquetRecordReader.java | 2 +-
.../apache/hadoop/hive/ql/plan/PartitionDesc.java | 7 +
.../org/apache/hadoop/hive/ql/plan/TableDesc.java | 31 +-
.../clientpositive/llap/db_ddl_explain.q.out | 1 +
.../org/apache/hadoop/hive/common/io/CacheTag.java | 48 +--
26 files changed, 640 insertions(+), 195 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index c05f8bc62ab..34b99d6b00b 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path,
SyntheticFileId fileId,
// Note: Since Hive doesn't know about partition information of Iceberg
tables, partitionDesc is only used to
// deduct the table (and DB) name here.
CacheTag cacheTag = HiveConf.getBoolVar(job,
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
- LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc)
: null;
+ LlapHiveUtils.getCacheTag(path, true, partitionDesc) : null;
try {
// Schema has to be serialized and deserialized as it is passed
between different packages of TypeDescription:
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 641958aef8a..8b15f4392eb 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -233,10 +233,11 @@ message SetCapacityRequestProto {
message SetCapacityResponseProto {
}
-// Used for proactive eviction request. Must contain one DB name, and
optionally table information.
+// Used for proactive eviction request. Must contain a DB name, and optionally
table information and catalog name.
message EvictEntityRequestProto {
required string db_name = 1;
repeated TableProto table = 2;
+ optional string catalog_name = 3 [default = "hive"];
}
// Used in EvictEntityRequestProto, can be used for non-partitioned and
partitioned tables too.
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
index dcb90ec197d..aa3b6fdd066 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,7 +150,7 @@ public void
loadData(LlapDaemonProtocolProtos.CacheEntryList data) {
}
private void loadData(LlapDaemonProtocolProtos.CacheEntry ce) throws
IOException {
- CacheTag cacheTag = decodeCacheTag(ce.getCacheTag());
+ CacheTag cacheTag = decodeCacheTag(ce.getCacheTag(), conf);
DiskRangeList ranges = decodeRanges(ce.getRangesList());
Object fileKey = decodeFileKey(ce.getFileKey());
try (LlapOrcCacheLoader llr = new LlapOrcCacheLoader(new
Path(ce.getFilePath()), fileKey, conf, cache,
@@ -167,9 +168,16 @@ private static DiskRangeList
decodeRanges(List<LlapDaemonProtocolProtos.CacheEnt
return helper.get();
}
- private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct)
{
- return ct.getPartitionDescCount() == 0 ? CacheTag.build(ct.getTableName())
: CacheTag
- .build(ct.getTableName(), ct.getPartitionDescList());
+ private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct,
Configuration conf) {
+ String tableName = ct.getTableName();
+ String[] parts = tableName.split("\\.");
+ if (parts.length == 2) {
+ // db.table without catalog, prepend current or default catalog
+ tableName = HiveUtils.getCurrentCatalogOrDefault(conf) + '.' + tableName;
+ }
+ return ct.getPartitionDescCount() == 0
+ ? CacheTag.build(tableName)
+ : CacheTag.build(tableName, ct.getPartitionDescList());
}
@VisibleForTesting
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 22407d8a2e3..f26af3d9036 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -324,9 +324,11 @@ public long
evictEntity(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRe
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(markedBytes).append(" bytes marked for eviction from LLAP
cache buffers that belong to table(s): ");
- for (String table :
request.getEntities().get(request.getSingleDbName()).keySet()) {
- sb.append(table).append(" ");
- }
+ request.getEntities().forEach((catalogdb, tables) ->
+ tables.forEach((table, partitions) ->
+
sb.append(catalogdb.catalog()).append(".").append(catalogdb.database())
+ .append(".").append(table).append(" "))
+ );
sb.append(" Duration: ").append(time).append(" ms");
LOG.debug(sb.toString());
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 75a71560b81..76f95a3ec33 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache,
BufferUsageManager buff
// LlapInputFormat needs to know the file schema to decide if schema
evolution is supported.
PartitionDesc partitionDesc =
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
- ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true,
partitionDesc) : null;
+ ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) :
null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache
will fix all that
fsSupplier = getFsSupplier(split.getPath(), jobConf);
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 3322136366d..e32b0584c88 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -225,7 +225,7 @@ public MemoryBuffer create() {
PartitionDesc partitionDesc =
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
- ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true,
partitionDesc) : null;
+ ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) :
null;
this.sourceInputFormat = sourceInputFormat;
this.sourceSerDe = sourceSerDe;
this.reporter = reporter;
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 02ee55f250e..1c975b623d0 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
@@ -140,7 +141,6 @@ public boolean isMarkedForEviction() {
@Override
public CacheTag getTag() {
- // We don't care about these.
- return CacheTag.build("OrcEstimates");
+ return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "OrcEstimates");
}
}
\ No newline at end of file
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
index 15d3f8fd157..08eb1f45d93 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -17,11 +17,10 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.Map;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -127,7 +126,7 @@ public void testCacheTagComparison() {
public void testEncodingDecoding() throws Exception {
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
partDescs.put("pytha=goras", "a2+b2=c2");
- CacheTag tag = CacheTag.build("math.rules", partDescs);
+ CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"math.rules", partDescs);
CacheTag.SinglePartitionCacheTag stag =
((CacheTag.SinglePartitionCacheTag)tag);
assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
assertEquals(1, stag.getPartitionDescMap().size());
@@ -136,7 +135,7 @@ public void testEncodingDecoding() throws Exception {
partDescs.clear();
partDescs.put("mutli=one", "one=/1");
partDescs.put("mutli=two/", "two=2");
- tag = CacheTag.build("math.rules", partDescs);
+ tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "math.rules",
partDescs);
CacheTag.MultiPartitionCacheTag mtag =
((CacheTag.MultiPartitionCacheTag)tag);
assertEquals("mutli=one=one=/1/mutli=two/=two=2",
mtag.partitionDescToString());
assertEquals(2, mtag.getPartitionDescMap().size());
@@ -168,6 +167,10 @@ private static LlapCacheableBuffer createMockBuffer(long
size, CacheTag cacheTag
}
public static CacheTag cacheTagBuilder(String dbAndTable, String...
partitions) {
+ String[] parts = dbAndTable.split("\\.");
+ if(parts.length < 3) {
+ dbAndTable = Warehouse.DEFAULT_CATALOG_NAME + "." + dbAndTable;
+ }
if (partitions != null && partitions.length > 0) {
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
for (String partition : partitions) {
@@ -215,33 +218,33 @@ private static void evictSomeTestBuffers() {
private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
"\n" +
"Cache state: \n" +
- "default : 2/2, 2101248/2101248\n" +
- "default.testtable : 2/2, 2101248/2101248\n" +
- "otherdb : 7/7, 1611106304/1611106304\n" +
- "otherdb.testtable : 4/4, 231424/231424\n" +
- "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
- "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
- "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
- "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
- "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
- "otherdb.testtable2 : 2/2, 537133056/537133056\n" +
- "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
- "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+ "hive.default : 2/2, 2101248/2101248\n" +
+ "hive.default.testtable : 2/2, 2101248/2101248\n" +
+ "hive.otherdb : 7/7, 1611106304/1611106304\n" +
+ "hive.otherdb.testtable : 4/4, 231424/231424\n" +
+ "hive.otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
+ "hive.otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
+ "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+ "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+ "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+ "hive.otherdb.testtable2 : 2/2, 537133056/537133056\n" +
+ "hive.otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
+ "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
"\n" +
"Cache state: \n" +
- "default : 0/2, 0/2101248\n" +
- "default.testtable : 0/2, 0/2101248\n" +
- "otherdb : 5/7, 1074202624/1611106304\n" +
- "otherdb.testtable : 3/4, 198656/231424\n" +
- "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
- "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
- "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
- "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
- "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
- "otherdb.testtable2 : 1/2, 262144/537133056\n" +
- "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
- "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+ "hive.default : 0/2, 0/2101248\n" +
+ "hive.default.testtable : 0/2, 0/2101248\n" +
+ "hive.otherdb : 5/7, 1074202624/1611106304\n" +
+ "hive.otherdb.testtable : 3/4, 198656/231424\n" +
+ "hive.otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
+ "hive.otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
+ "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+ "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+ "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+ "hive.otherdb.testtable2 : 1/2, 262144/537133056\n" +
+ "hive.otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
+ "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
}
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
index 34203ddf5d6..f5bb1e0d254 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java
@@ -19,6 +19,7 @@
import com.google.common.base.Function;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.junit.Test;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +33,7 @@ public void testFileCacheMetadata() {
ConcurrentHashMap<Object, FileCache<Object>> cache = new
ConcurrentHashMap<>();
Object fileKey = 1234L;
Function<Void, Object> f = a -> new Object();
- CacheTag tag = CacheTag.build("test_table");
+ CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"test_db.test_table");
FileCache<Object> result = FileCache.getOrAddFileSubCache(cache, fileKey,
f, tag);
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 4e3c10ed6b2..764dd9ec319 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -309,13 +310,14 @@ private void _testProactiveEvictionMark(boolean
isInstantDeallocation) {
LlapDataBuffer[] buffs1 = IntStream.range(0, 4).mapToObj(i ->
fb()).toArray(LlapDataBuffer[]::new);
DiskRange[] drs1 = drs(IntStream.range(1, 5).toArray());
- CacheTag tag1 = CacheTag.build("default.table1");
+ CacheTag tag1 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"default.table1");
LlapDataBuffer[] buffs2 = IntStream.range(0, 41).mapToObj(i ->
fb()).toArray(LlapDataBuffer[]::new);
DiskRange[] drs2 = drs(IntStream.range(1, 42).toArray());
- CacheTag tag2 = CacheTag.build("default.table2");
+ CacheTag tag2 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"default.table2");
- Predicate<CacheTag> predicate = tag ->
"default.table1".equals(tag.getTableName());
+ Predicate<CacheTag> predicate = tag ->
+ (Warehouse.DEFAULT_CATALOG_NAME + "." +
"default.table1").equals(tag.getTableName());
cache.putFileData(fn1, drs1, buffs1, 0, Priority.NORMAL, null, tag1);
cache.putFileData(fn2, drs2, buffs2, 0, Priority.NORMAL, null, tag2);
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 62a8c747439..5ec15beccfb 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
@@ -249,7 +250,7 @@ public void testGetOrcTailForPath() throws Exception {
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
- CacheTag tag = CacheTag.build("test-table");
+ CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"test-db.test-table");
OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf,
tag, daemonConf, cache, null);
jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf,
tag, daemonConf, cache, null);
@@ -270,7 +271,7 @@ public void testGetOrcTailForPathWithFileId() throws
Exception {
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
- CacheTag tag = CacheTag.build("test-table");
+ CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"test-db.test-table");
FileSystem fs = FileSystem.get(daemonConf);
FileStatus fileStatus = fs.getFileStatus(path);
OrcTail uncached =
OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag,
daemonConf, cache, new SyntheticFileId(fileStatus));
@@ -294,7 +295,7 @@ public void testGetOrcTailForPathWithFileIdChange() throws
Exception {
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
- CacheTag tag = CacheTag.build("test-table");
+ CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME,
"test-db.test-table");
OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf,
cache, new SyntheticFileId(path, 100, 100));
jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
Exception ex = null;
@@ -337,19 +338,23 @@ public void testProactiveEvictionMark() throws Exception {
// below is of length 65
ByteBuffer bb2 =
ByteBuffer.wrap("-large-meta-data-content-large-meta-data-content-large-meta-data-".getBytes());
- LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb,
CacheTag.build("default.table1"), isStopped);
+ LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb,
+ CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"),
isStopped);
assertNotNull(table1Buffers1.getSingleLlapBuffer());
- LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2,
CacheTag.build("default.table1"), isStopped);
+ LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2,
+ CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"),
isStopped);
assertNotNull(table1Buffers2.getMultipleLlapBuffers());
assertEquals(2, table1Buffers2.getMultipleLlapBuffers().length);
// Case for when metadata consists of just 1 buffer (most of the realworld
cases)
ByteBuffer bb3 =
ByteBuffer.wrap("small-meta-data-content-for-otherFile".getBytes());
- LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3,
CacheTag.build("default.table2"), isStopped);
+ LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3,
+ CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2"),
isStopped);
assertNotNull(table2Buffers1.getSingleLlapBuffer());
- Predicate<CacheTag> predicate = tag ->
"default.table1".equals(tag.getTableName());
+ Predicate<CacheTag> predicate = tag ->
+ (Warehouse.DEFAULT_CATALOG_NAME +
".default.table1").equals(tag.getTableName());
// Simulating eviction on some buffers
table1Buffers2.getMultipleLlapBuffers()[1].decRef();
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
index 89c8f605503..e75237b8765 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java
@@ -22,6 +22,8 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -32,7 +34,9 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ProactiveEviction.Request;
import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder;
+import
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EvictEntityRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.metastore.Warehouse;
import com.google.common.annotations.VisibleForTesting;
@@ -42,6 +46,7 @@
import static
org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
/**
@@ -106,6 +111,328 @@ private static void assertMatchOnTags(Builder
requestBuilder, String expected) {
assertEquals(expected, sb.toString());
}
+ /**
+ * Verifies that passing an explicit catalog produces correct matching via
isTagMatch.
+ * TEST_TAGS all belong to the default catalog, so requests for a different
catalog must not match.
+ */
+ @Test
+ public void testCatalogAwareCacheTagAndRequestMatching() {
+ // Default catalog matches as expected.
+ assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000");
+ assertMatchOnTags(Builder.create().addTable("fx", "futures"),
"000001111000000000");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures",
+ buildParts("ccy", "JPY")), "000000110000000000");
+ assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"),
"000000000000000110");
+ assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates",
+ buildParts("from", "EUR", "to", "HUF")), "000010000000000000");
+
+ // Non-default catalog: CacheTag now carries catalog info, so none of the
TEST_TAGS
+ // (all default-catalog) should match requests targeting a different
catalog.
+ assertMatchOnTags(Builder.create().addDb("custom_catalog", "fx"),
"000000000000000000");
+ assertMatchOnTags(Builder.create().addTable("custom_catalog", "equity",
"prices"),
+ "000000000000000000");
+ assertMatchOnTags(Builder.create().addPartitionOfATable(
+ "custom_catalog", "equity", "prices", buildParts("ex", "NYSE")),
+ "000000000000000000");
+ }
+
+ /**
+ * Verifies that catalog_name is serialized into the proto and correctly
restored via fromProtoRequest.
+ */
+ @Test
+ public void testProtoRoundTripPreservesCatalog() {
+ // Default catalog is always serialized into the proto.
+ Request defaultCatRequest = Builder.create().addDb("testdb").build();
+ List<EvictEntityRequestProto> protos = defaultCatRequest.toProtoRequests();
+ assertEquals(1, protos.size());
+ EvictEntityRequestProto proto = protos.get(0);
+ assertEquals(Warehouse.DEFAULT_CATALOG_NAME, proto.getCatalogName());
+ assertEquals("testdb", proto.getDbName());
+
+ Request roundTripped = Builder.create().fromProtoRequest(proto).build();
+ assertTrue(roundTripped.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME,
"testdb"));
+
+ // Custom catalog is also preserved.
+ Request customCatRequest = Builder.create().addTable("spark_catalog",
"salesdb", "orders").build();
+ protos = customCatRequest.toProtoRequests();
+ assertEquals(1, protos.size());
+ proto = protos.get(0);
+ assertEquals("spark_catalog", proto.getCatalogName());
+ assertEquals("salesdb", proto.getDbName());
+
+ roundTripped = Builder.create().fromProtoRequest(proto).build();
+ assertTrue(roundTripped.hasDatabaseName("spark_catalog", "salesdb"));
+ }
+
+ /**
+ * Verifies that entities in different catalogs are independently scoped
even when they share
+ * the same DB name.
+ */
+ @Test
+ public void testMultiCatalogBuilderScoping() {
+ // Two different catalogs, each with the same DB name but different tables.
+ Request request = Builder.create()
+ .addTable("catalog_a", "shared_db", "table_a")
+ .addTable("catalog_b", "shared_db", "table_b")
+ .build();
+
+ assertEquals(2, request.getEntities().size());
+ assertTrue(request.getEntities().containsKey(new
Request.CatalogDb("catalog_a", "shared_db")));
+ assertTrue(request.getEntities().containsKey(new
Request.CatalogDb("catalog_b", "shared_db")));
+
+ // catalog_a only knows about table_a.
+ assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_a",
"shared_db")).containsKey("table_a"));
+ assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_a",
"shared_db")).containsKey("table_b"));
+
+ // catalog_b only knows about table_b.
+ assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_b",
"shared_db")).containsKey("table_b"));
+ assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_b",
"shared_db")).containsKey("table_a"));
+ }
+
+ /**
+ * Verifies that multiple tables and partitions added to the same catalog+DB
are merged
+ * into a single catalog entry (no duplication).
+ */
+ @Test
+ public void testSameCatalogMultipleEntitiesMergedCorrectly() {
+ Request request = Builder.create()
+ .addTable("mydb", "table1")
+ .addTable("mydb", "table2")
+ .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-01"))
+ .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-02"))
+ .build();
+
+ assertTrue(request.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME,
"mydb"));
+ // One catalog, one DB, three tables.
+ assertEquals(1, request.getEntities().size());
+ assertEquals(3, request.getEntities()
+ .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME,
"mydb")).size());
+ // table3 has two partition specs.
+ assertEquals(2, request.getEntities()
+ .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME,
"mydb")).get("table3").size());
+ }
+
+ /**
+ * Verifies that CacheTag catalog information is correctly used to isolate
eviction between catalogs.
+ * A request targeting catalog A must not evict buffers that belong to
catalog B, even when the
+ * DB and table names are identical.
+ */
+ @Test
+ public void testCatalogIsolationInIsTagMatch() {
+ CacheTag defaultCatalogTag = cacheTagBuilder("fx.rates", "from=USD",
"to=HUF");
+ CacheTag otherCatalogTag = cacheTagBuilder("other_catalog.fx.rates",
"from=USD", "to=HUF");
+
+ // Request for the default catalog's "fx" DB matches only default-catalog
tags.
+ Request defaultCatalogRequest = Builder.create()
+ .fromProtoRequest(Builder.create()
+ .addDb("fx")
+ .build().toProtoRequests().get(0))
+ .build();
+ assertTrue(defaultCatalogRequest.isTagMatch(defaultCatalogTag));
+ assertFalse("Must not evict buffers belonging to other_catalog",
+ defaultCatalogRequest.isTagMatch(otherCatalogTag));
+
+ // Request for a different catalog matches only tags from that catalog.
+ Request otherCatalogRequest = Builder.create()
+ .fromProtoRequest(Builder.create()
+ .addDb("other_catalog", "fx")
+ .build().toProtoRequests().get(0))
+ .build();
+ assertTrue(otherCatalogRequest.isTagMatch(otherCatalogTag));
+ assertFalse("Must not evict buffers belonging to the default catalog",
+ otherCatalogRequest.isTagMatch(defaultCatalogTag));
+
+ // A request for a DB that doesn't exist in the tags must not match,
regardless of catalog.
+ Request noMatchRequest = Builder.create()
+ .fromProtoRequest(Builder.create()
+ .addDb("any_catalog", "nonexistent_db")
+ .build().toProtoRequests().get(0))
+ .build();
+ assertFalse(noMatchRequest.isTagMatch(defaultCatalogTag));
+ assertFalse(noMatchRequest.isTagMatch(otherCatalogTag));
+ }
+
+ /**
+ * Verifies that Iceberg metadata table cache tags
(catalog.db.table.metaTable) are handled by
+ * isTagMatch and evicted when the base table is dropped.
+ */
+ @Test
+ public void testIcebergMetaTableTagMatching() {
+ CacheTag baseTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders");
+ CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders.files");
+ CacheTag snapshotsMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME
+ ".salesdb.orders.snapshots");
+ CacheTag otherTableMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME
+ ".salesdb.other.files");
+
+ Request dropTableRequest = Builder.create()
+ .fromProtoRequest(Builder.create()
+ .addTable("salesdb", "orders")
+ .build().toProtoRequests().get(0))
+ .build();
+
+ assertTrue(dropTableRequest.isTagMatch(baseTableTag));
+ assertTrue(dropTableRequest.isTagMatch(filesMetaTag));
+ assertTrue(dropTableRequest.isTagMatch(snapshotsMetaTag));
+ assertFalse(dropTableRequest.isTagMatch(otherTableMetaTag));
+
+ // Drop-partition requests must not evict metadata-table cache via prefix
matching.
+ Request dropPartitionRequest = Builder.create()
+ .fromProtoRequest(Builder.create()
+ .addPartitionOfATable("salesdb", "orders", buildParts("dt",
"2024-01-01"))
+ .build().toProtoRequests().get(0))
+ .build();
+ assertFalse(dropPartitionRequest.isTagMatch(filesMetaTag));
+ }
+
+ /**
+ * Legacy cache tags created before catalog support are 2-part (db.table)
with no catalog
+ * component. They must be treated as belonging to the default catalog.
+ */
+ @Test
+ public void testTwoPartLegacyTagMatching() {
+ CacheTag tableTag = CacheTag.build("salesdb.orders");
+ CacheTag partitionedTag = CacheTag.build("salesdb.orders",
buildParts("dt", "2024-01-01"));
+
+ // Drop database, table and matching partition (all default catalog) evict
the legacy tags.
+ assertTrue(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME,
"salesdb").isTagMatch(tableTag));
+ assertTrue(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb",
"orders").isTagMatch(tableTag));
+ assertTrue(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb",
"orders",
+ buildParts("dt", "2024-01-01")).isTagMatch(partitionedTag));
+
+ // A non-matching partition value must not evict.
+ assertFalse(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME,
"salesdb", "orders",
+ buildParts("dt", "2024-01-02")).isTagMatch(partitionedTag));
+
+ // A request scoped to a custom catalog must not evict default-catalog
legacy tags.
+ assertFalse(dropDbRequest("custom_catalog",
"salesdb").isTagMatch(tableTag));
+ assertFalse(dropTableRequest("custom_catalog", "salesdb",
"orders").isTagMatch(tableTag));
+ }
+
+ /**
+ * Iceberg metadata table tags (catalog.db.table.metaTable) on a non-default
catalog must be
+ * evicted only by requests scoped to that same catalog.
+ */
+ @Test
+ public void testNonDefaultCatalogIcebergMetaTableMatching() {
+ CacheTag filesMetaTag =
CacheTag.build("spark_catalog.salesdb.orders.files");
+ CacheTag snapshotsMetaTag =
CacheTag.build("spark_catalog.salesdb.orders.snapshots");
+
+ // Drop table on the custom catalog evicts its metadata-table cache.
+ Request dropTable = dropTableRequest("spark_catalog", "salesdb", "orders");
+ assertTrue(dropTable.isTagMatch(filesMetaTag));
+ assertTrue(dropTable.isTagMatch(snapshotsMetaTag));
+
+ // Drop database on the custom catalog evicts metadata-table cache too.
+ Request dropDb = dropDbRequest("spark_catalog", "salesdb");
+ assertTrue(dropDb.isTagMatch(filesMetaTag));
+ assertTrue(dropDb.isTagMatch(snapshotsMetaTag));
+
+ // The same logical name in the default catalog must not be evicted.
+ assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb",
"orders")
+ .isTagMatch(filesMetaTag));
+ assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME,
"salesdb").isTagMatch(filesMetaTag));
+ }
+
+ /**
+ * Dropping a database must evict both base-table and Iceberg metadata-table
cache entries
+ * belonging to that database.
+ */
+ @Test
+ public void testDropDatabaseEvictsMetaTableTags() {
+ CacheTag baseTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders");
+ CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders.files");
+
+ Request dropDb = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb");
+ assertTrue(dropDb.isTagMatch(baseTag));
+ assertTrue(dropDb.isTagMatch(filesMetaTag));
+
+ // A database with a different name must not match.
+ assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME,
"otherdb").isTagMatch(filesMetaTag));
+ }
+
+ /**
+ * Prefix matching used for Iceberg metadata tables must not produce false
positives for tables
+ * that merely share a name prefix with the dropped table.
+ */
+ @Test
+ public void testDropTablePrefixMatchingAvoidsFalsePositives() {
+ CacheTag siblingTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders_archive");
+ CacheTag siblingMetaTag =
+ CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders_archive.files");
+
+ Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME,
"salesdb", "orders");
+ assertFalse(dropTable.isTagMatch(siblingTableTag));
+ assertFalse(dropTable.isTagMatch(siblingMetaTag));
+ }
+
+ /**
+ * With catalog-aware tags a 3-part name is always interpreted as
catalog.db.table, never as a
+ * default-catalog db.table.metaTable.
+ */
+ @Test
+ public void testThreePartTagInterpretedAsCatalogQualified() {
+ CacheTag tag = CacheTag.build("custom_catalog.salesdb.orders");
+
+ // Matched when the request targets the same catalog + db + table.
+ assertTrue(dropTableRequest("custom_catalog", "salesdb",
"orders").isTagMatch(tag));
+
+ // Not matched when "custom_catalog" is mistaken for a database in the
default catalog.
+ assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME,
"custom_catalog", "salesdb")
+ .isTagMatch(tag));
+ }
+
+ /**
+ * Snapshot-ref tags (branch_/tag_) must be evicted when their base table is
dropped, both in the
+ * 4-part catalog-qualified form and in the legacy 3-part db.table.ref form.
+ */
+ @Test
+ public void testSnapshotRefTagMatching() {
+ CacheTag branchTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders.branch_main");
+ CacheTag tagRefTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME +
".salesdb.orders.tag_v1");
+ // Legacy 3-part snapshot ref without catalog prefix -> default-catalog
db.table.ref.
+ CacheTag legacyBranchTag = CacheTag.build("salesdb.orders.branch_main");
+
+ Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME,
"salesdb", "orders");
+ assertTrue(dropTable.isTagMatch(branchTag));
+ assertTrue(dropTable.isTagMatch(tagRefTag));
+ assertTrue(dropTable.isTagMatch(legacyBranchTag));
+ }
+
+ /**
+ * Cache tag names must have between 2 and 4 dot-separated components;
anything else is rejected.
+ */
+ @Test
+ public void testInvalidCacheTagLengthThrows() {
+ Request request = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb");
+
+ // Single-component tag is not a valid db-qualified name.
+ assertThrows(UnsupportedOperationException.class,
+ () -> request.isTagMatch(CacheTag.build("orders")));
+
+ // Five-component tag exceeds the supported catalog.db.table.metaTable
structure.
+ assertThrows(UnsupportedOperationException.class,
+ () -> request.isTagMatch(CacheTag.build("a.b.c.d.e")));
+ }
+
+ private static Request dropDbRequest(String catalog, String db) {
+ return roundTrip(Builder.create().addDb(catalog, db));
+ }
+
+ private static Request dropTableRequest(String catalog, String db, String
table) {
+ return roundTrip(Builder.create().addTable(catalog, db, table));
+ }
+
+ private static Request dropPartitionRequest(String catalog, String db,
String table,
+ Map<String, String> partSpec) {
+ return roundTrip(Builder.create().addPartitionOfATable(catalog, db, table,
partSpec));
+ }
+
+ /**
+ * Marshals the request to proto and back, mirroring how the LLAP daemon
receives requests.
+ */
+ private static Request roundTrip(Builder requestBuilder) {
+ return
Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build();
+ }
+
@Test
public void testProactiveSweep() throws Exception {
closeSweeperExecutorForTest();
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
index 01581dd94a7..6082f57d065 100644
---
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java
@@ -107,7 +107,7 @@ private LlapDaemonProtocolProtos.CacheEntryList
createDummyMetadata() throws IOE
LlapDaemonProtocolProtos.CacheEntryRange re2 =
LlapDaemonProtocolProtos.CacheEntryRange.newBuilder().setStart(14L).setEnd(38L).build();
LlapDaemonProtocolProtos.CacheTag ct =
-
LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("dummyTable").build();
+
LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("hive.default.dummyTable").build();
Path path = new Path(TEST_PATH);
SyntheticFileId syntheticFileId = fileId(path);
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index ba62b8d89c2..76034ca69a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
@@ -74,18 +75,21 @@ public static PartitionDesc partitionDescForPath(Path path,
Map<Path, PartitionD
return part;
}
- public static CacheTag getDbAndTableNameForMetrics(Path path, boolean
includeParts,
- PartitionDesc part) {
-
- // Fallback to legacy cache tag creation logic.
+ /**
+ * Builds a {@link CacheTag} for the given path and partition descriptor.
+ * The catalog name is derived from the {@link PartitionDesc} when
available, falling back
+ * to {@link Warehouse#DEFAULT_CATALOG_NAME} when {@code part} is null.
+ */
+ public static CacheTag getCacheTag(Path path, boolean includeParts,
PartitionDesc part) {
if (part == null) {
- return CacheTag.build(LlapUtil.getDbAndTableNameForMetrics(path,
includeParts));
+ return CacheTag.build(
+ Warehouse.DEFAULT_CATALOG_NAME,
LlapUtil.getDbAndTableNameForMetrics(path, includeParts));
}
-
+ String catalogName = part.getCatalogName();
if (!includeParts || !part.isPartitioned()) {
- return CacheTag.build(part.getTableName());
+ return CacheTag.build(catalogName, part.getTableName());
} else {
- return CacheTag.build(part.getTableName(), part.getPartSpec());
+ return CacheTag.build(catalogName, part.getTableName(),
part.getPartSpec());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
index 120949fc994..b1fcf31a28f 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ProactiveEviction.java
@@ -21,9 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,6 +38,7 @@
import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
@@ -139,7 +138,8 @@ public void run() {
long evictedBytes = 0;
for (LlapDaemonProtocolProtos.EvictEntityRequestProto protoRequest :
protoRequests) {
- LOG.debug("Requesting proactive eviction for entities in database
{}", protoRequest.getDbName());
+ LOG.debug("Requesting proactive eviction for entities in catalog {},
database {}",
+ protoRequest.getCatalogName(), protoRequest.getDbName());
LlapDaemonProtocolProtos.EvictEntityResponseProto response =
client.evictEntity(null, protoRequest);
evictedBytes += response.getEvictedBytes();
LOG.debug("Proactively evicted {} bytes",
response.getEvictedBytes());
@@ -152,19 +152,19 @@ public void run() {
}
/**
- * Holds information on entities: DB name(s), table name(s), partitions.
+ * Holds information on entities: catalog name(s), DB name(s), table
name(s), partitions.
*/
public static final class Request {
- // Holds a hierarchical structure of DBs, tables and partitions such as:
- // { testdb : { testtab0 : [], testtab1 : [ {pk0 : p0v0, pk1 : p0v1}, {pk0
: p1v0, pk1 : p1v1} ] }, testdb2 : {} }
- private final Map<String, Map<String, Set<LinkedHashMap<String, String>>>>
entities;
+ public record PartitionSpec(Map<String, String> spec) {}
+ public record CatalogDb(String catalog, String database){}
+ private final Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities;
- private Request(Map<String, Map<String, Set<LinkedHashMap<String,
String>>>> entities) {
+ private Request(Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities) {
this.entities = entities;
}
- public Map<String, Map<String, Set<LinkedHashMap<String, String>>>>
getEntities() {
+ public Map<CatalogDb, Map<String, Set<PartitionSpec>>> getEntities() {
return entities;
}
@@ -172,15 +172,8 @@ public boolean isEmpty() {
return entities.isEmpty();
}
- /**
- * Request often times only contains tables/partitions of 1 DB only.
- * @return the single DB name, null if the count of DBs present is not
exactly 1.
- */
- public String getSingleDbName() {
- if (entities.size() == 1) {
- return entities.keySet().stream().findFirst().get();
- }
- return null;
+ public boolean hasDatabaseName(String catalogName, String dbName) {
+ return entities.containsKey(new CatalogDb(catalogName, dbName));
}
/**
@@ -188,41 +181,39 @@ public String getSingleDbName() {
* @return list of request instances ready to be sent over protobuf.
*/
public List<LlapDaemonProtocolProtos.EvictEntityRequestProto>
toProtoRequests() {
-
- List<LlapDaemonProtocolProtos.EvictEntityRequestProto> protoRequests =
new LinkedList<>();
-
- for (Map.Entry<String, Map<String, Set<LinkedHashMap<String, String>>>>
dbEntry : entities.entrySet()) {
- String dbName = dbEntry.getKey();
- Map<String, Set<LinkedHashMap<String, String>>> tables =
dbEntry.getValue();
-
- LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder
requestBuilder =
- LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder();
- LlapDaemonProtocolProtos.TableProto.Builder tableBuilder = null;
-
- requestBuilder.setDbName(dbName.toLowerCase());
- for (Map.Entry<String, Set<LinkedHashMap<String, String>>> tableEntry
: tables.entrySet()) {
- String tableName = tableEntry.getKey();
- tableBuilder = LlapDaemonProtocolProtos.TableProto.newBuilder();
- tableBuilder.setTableName(tableName.toLowerCase());
-
- Set<LinkedHashMap<String, String>> partitions =
tableEntry.getValue();
- Set<String> partitionKeys = null;
-
- for (Map<String, String> partitionSpec : partitions) {
- if (partitionKeys == null) {
+ return entities.entrySet().stream()
+ .map(entry -> {
+ CatalogDb catalogDb = entry.getKey();
+ Map<String, Set<PartitionSpec>> tables = entry.getValue();
+ LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder
requestBuilder =
+ LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder();
+
+ requestBuilder.setCatalogName(catalogDb.catalog().toLowerCase());
+ requestBuilder.setDbName(catalogDb.database().toLowerCase());
+
+ tables.forEach((tableName, partitions) -> {
+ LlapDaemonProtocolProtos.TableProto.Builder tableBuilder =
+ LlapDaemonProtocolProtos.TableProto.newBuilder();
+
+ tableBuilder.setTableName(tableName.toLowerCase());
+
+ Set<String> partitionKeys = null;
+
+ for (PartitionSpec partitionSpec : partitions) {
+ if (partitionKeys == null) {
+ partitionKeys = new
LinkedHashSet<>(partitionSpec.spec().keySet());
+ tableBuilder.addAllPartKey(partitionKeys);
+ }
+ for (String partKey : tableBuilder.getPartKeyList()) {
+ tableBuilder.addPartVal(partitionSpec.spec().get(partKey));
+ }
+ }
// For a given table the set of partition columns (keys) should
not change.
- partitionKeys = new LinkedHashSet<>(partitionSpec.keySet());
- tableBuilder.addAllPartKey(partitionKeys);
- }
- for (String partKey : tableBuilder.getPartKeyList()) {
- tableBuilder.addPartVal(partitionSpec.get(partKey));
- }
- }
- requestBuilder.addTable(tableBuilder.build());
- }
- protoRequests.add(requestBuilder.build());
- }
- return protoRequests;
+ requestBuilder.addTable(tableBuilder.build());
+ });
+ return requestBuilder.build();
+ })
+ .toList();
}
/**
@@ -233,19 +224,19 @@ public
List<LlapDaemonProtocolProtos.EvictEntityRequestProto> toProtoRequests()
* @return true if cacheTag matches and the related buffer is eligible for
proactive eviction, false otherwise.
*/
public boolean isTagMatch(CacheTag cacheTag) {
- String db = getSingleDbName();
- if (db == null) {
- // Number of DBs in the request was not exactly 1.
- throw new UnsupportedOperationException("Predicate only implemented
for 1 DB case.");
- }
- TableName tagTableName = TableName.fromString(cacheTag.getTableName(),
null, null);
-
- // Check against DB.
- if (!db.equals(tagTableName.getDb())) {
+ // Parse the tag once and derive catalog/db from the parsed result, so
that 2-part
+ // (db.table), 3-part (catalog.db.table), 4-part
(catalog.db.table.metaTable) and
+ // snapshot-ref names are all interpreted consistently.
+ TableName tagTableName = parseCacheTagTableName(cacheTag.getTableName());
+ String catalog = tagTableName.getCat();
+ String db = tagTableName.getDb();
+
+ // Check that the tag's catalog and database is present in the eviction
request.
+ if (!entities.containsKey(new CatalogDb(catalog, db))) {
return false;
}
- Map<String, Set<LinkedHashMap<String, String>>> tables =
entities.get(db);
+ Map<String, Set<PartitionSpec>> tables = entities.getOrDefault(new
CatalogDb(catalog, db), Map.of());
// If true, must be a drop DB event and this cacheTag matches.
if (tables.isEmpty()) {
@@ -257,31 +248,60 @@ public boolean isTagMatch(CacheTag cacheTag) {
tagPartDescMap = ((CacheTag.PartitionCacheTag)
cacheTag).getPartitionDescMap();
}
+ String tagDbTable = tagTableName.getNotEmptyDbTable();
// Check against table name.
- for (String tableAndDbName : tables.keySet()) {
- if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) {
-
- Set<LinkedHashMap<String, String>> partDescs =
tables.get(tableAndDbName);
-
- // If true, must be a drop table event, and this cacheTag matches.
- if (partDescs == null) {
- return true;
+ for (Map.Entry<String, Set<PartitionSpec>> tableEntry :
tables.entrySet()) {
+ String tableAndDbName = tableEntry.getKey();
+ Set<PartitionSpec> partDescs = tableEntry.getValue();
+ if (!tableAndDbName.equals(tagDbTable)) {
+ // Drop-table requests use db.table; Iceberg metadata tables are
tagged db.table.metaTable.
+ if (partDescs != null || !tagDbTable.startsWith(tableAndDbName +
".")) {
+ continue;
}
+ return true;
+ }
- // Check against partition keys and values and alas for drop
partition event.
- if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
- throw new IllegalArgumentException("CacheTag has no partition
information, while trying" +
- " to evict due to (and based on) a drop partition DDL
statement..");
- }
+ // If true, must be a drop table event, and this cacheTag matches.
+ if (partDescs == null) {
+ return true;
+ }
- if (partDescs.contains(tagPartDescMap)) {
- return true;
- }
+ // Check against partition keys and values and alas for drop partition
event.
+ if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) {
+ throw new IllegalArgumentException("CacheTag has no partition
information, while trying" +
+ " to evict due to (and based on) a drop partition DDL
statement..");
+ }
+
+ if (partDescs.contains(new PartitionSpec(tagPartDescMap))) {
+ return true;
}
}
return false;
}
+ /**
+ * Parses a cache-tag table name into a {@link TableName}. Supports legacy
{@code db.table},
+ * catalog-qualified {@code catalog.db.table}, and Iceberg metadata tables
+ * {@code catalog.db.table.metaTable}.
+ */
+ private static TableName parseCacheTagTableName(String fullTableName) {
+ String[] names = fullTableName.split("\\.");
+ switch (names.length) {
+ case 2:
+ return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0],
names[1], null);
+ case 3:
+ if (TableName.SNAPSHOT_REF.matcher(names[2]).matches()) {
+ return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0],
names[1], names[2]);
+ }
+ return new TableName(names[0], names[1], names[2], null);
+ case 4:
+ return new TableName(names[0], names[1], names[2], names[3]);
+ default:
+ throw new UnsupportedOperationException(
+ "Cache tag table name must have 2-4 dot-separated components: "
+ fullTableName);
+ }
+ }
+
@Override
public String toString() {
return "Request { entities = " + entities + " }";
@@ -292,7 +312,7 @@ public String toString() {
*/
public static final class Builder {
- private final Map<String, Map<String, Set<LinkedHashMap<String,
String>>>> entities;
+ private final Map<CatalogDb, Map<String, Set<PartitionSpec>>> entities;
private Builder() {
this.entities = new HashMap<>();
@@ -302,45 +322,64 @@ public static Builder create() {
return new Builder();
}
- public Builder addPartitionOfATable(String db, String tableName,
LinkedHashMap<String, String> partSpec) {
- ensureDb(db);
- ensureTable(db, tableName);
- entities.get(db).get(tableName).add(partSpec);
+ /**
+ * Add a partition of a table scoped to the given catalog.
+ */
+ public Builder addPartitionOfATable(String catalog, String db, String
tableName,
+ Map<String, String> partSpec) {
+ ensureTable(catalog, db, tableName);
+ entities.get(new CatalogDb(catalog, db)).get(tableName).add(new
PartitionSpec(partSpec));
return this;
}
+ /**
+ * Add a partition of a table scoped to the default catalog.
+ */
+ public Builder addPartitionOfATable(String db, String tableName,
Map<String, String> partSpec) {
+ return addPartitionOfATable(Warehouse.DEFAULT_CATALOG_NAME, db,
tableName, partSpec);
+ }
+
+ /**
+ * Add a database scoped to the given catalog.
+ */
+ public Builder addDb(String catalog, String db) {
+ ensureDb(catalog, db);
+ return this;
+ }
+
+ /**
+ * Add a database scoped to the default catalog.
+ */
public Builder addDb(String db) {
- ensureDb(db);
+ return addDb(Warehouse.DEFAULT_CATALOG_NAME, db);
+ }
+
+ /**
+ * Add a table scoped to the given catalog.
+ */
+ public Builder addTable(String catalog, String db, String table) {
+ ensureTable(catalog, db, table);
return this;
}
+ /**
+ * Add a table scoped to the default catalog.
+ */
public Builder addTable(String db, String table) {
- ensureDb(db);
- ensureTable(db, table);
- return this;
+ return addTable(Warehouse.DEFAULT_CATALOG_NAME, db, table);
}
public Request build() {
return new Request(entities);
}
- private void ensureDb(String dbName) {
- Map<String, Set<LinkedHashMap<String, String>>> tables =
entities.get(dbName);
- if (tables == null) {
- tables = new HashMap<>();
- entities.put(dbName, tables);
- }
+ private void ensureDb(String catalogName, String dbName) {
+ entities.computeIfAbsent(new CatalogDb(catalogName, dbName), k -> new
HashMap<>());
}
- private void ensureTable(String dbName, String tableName) {
- ensureDb(dbName);
- Map<String, Set<LinkedHashMap<String, String>>> tables =
entities.get(dbName);
-
- Set<LinkedHashMap<String, String>> partitions = tables.get(tableName);
- if (partitions == null) {
- partitions = new HashSet<>();
- tables.put(tableName, partitions);
- }
+ private void ensureTable(String catalogName, String dbName, String
tableName) {
+ ensureDb(catalogName, dbName);
+ entities.get(new CatalogDb(catalogName,
dbName)).computeIfAbsent(tableName, k -> new HashSet<>());
}
/**
@@ -350,9 +389,10 @@ private void ensureTable(String dbName, String tableName) {
*/
public Builder
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRequest)
{
entities.clear();
+ String catalogName = protoRequest.getCatalogName().toLowerCase();
String dbName = protoRequest.getDbName().toLowerCase();
- Map<String, Set<LinkedHashMap<String, String>>> entitiesInDb = new
HashMap<>();
+ Map<String, Set<PartitionSpec>> entitiesInDb = new HashMap<>();
List<LlapDaemonProtocolProtos.TableProto> tables =
protoRequest.getTableList();
if (tables != null && !tables.isEmpty()) {
@@ -364,8 +404,8 @@ public Builder
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto
entitiesInDb.put(dbAndTableName, null);
continue;
}
- Set<LinkedHashMap<String, String>> partitions = new HashSet<>();
- LinkedHashMap<String, String> partDesc = new LinkedHashMap<>();
+ Set<PartitionSpec> partitions = new HashSet<>();
+ Map<String, String> partDesc = new HashMap<>();
for (int valIx = 0; valIx < table.getPartValCount(); ++valIx) {
int keyIx = valIx % table.getPartKeyCount();
@@ -373,15 +413,15 @@ public Builder
fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto
partDesc.put(table.getPartKey(keyIx).toLowerCase(),
table.getPartVal(valIx));
if (keyIx == table.getPartKeyCount() - 1) {
- partitions.add(partDesc);
- partDesc = new LinkedHashMap<>();
+ partitions.add(new PartitionSpec(partDesc));
+ partDesc = new HashMap<>();
}
}
entitiesInDb.put(dbAndTableName, partitions);
}
}
- entities.put(dbName, entitiesInDb);
+ entities.put(new CatalogDb(catalogName, dbName), entitiesInDb);
return this;
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
index 11f8917334b..256b3456ff7 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java
@@ -67,6 +67,9 @@ public void analyzeInternal(ASTNode root) throws
SemanticException {
if (database == null) {
return;
}
+ if (catalogName == null) {
+ catalogName = database.getCatalogName();
+ }
// if cascade=true, then we need to authorize the drop table action as
well, and add the tables to the outputs
boolean isDbLevelLock = true;
if (cascade) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
index b544b7b4a24..fe4d153f1d2 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java
@@ -53,7 +53,7 @@ public int execute() throws HiveException {
if (LlapHiveUtils.isLlapMode(context.getConf())) {
ProactiveEviction.Request.Builder llapEvictRequestBuilder =
ProactiveEviction.Request.Builder.create();
- llapEvictRequestBuilder.addDb(dbName); // TODO catalog. add catalog
for the cache. Depend on HIVE-29281
+ llapEvictRequestBuilder.addDb(catName, dbName);
ProactiveEviction.evict(context.getConf(),
llapEvictRequestBuilder.build());
}
// Unregister the functions as well
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
index b253ec5df5f..7e6c93ebdca 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java
@@ -115,7 +115,7 @@ public int execute() throws HiveException {
if (LlapHiveUtils.isLlapMode(context.getConf())) {
TableName tableName = HiveTableName.of(table);
ProactiveEviction.Request.Builder llapEvictRequestBuilder =
ProactiveEviction.Request.Builder.create();
- llapEvictRequestBuilder.addTable(tableName.getDb(),
tableName.getTable());
+ llapEvictRequestBuilder.addTable(table.getCatName(), tableName.getDb(),
tableName.getTable());
ProactiveEviction.evict(context.getConf(),
llapEvictRequestBuilder.build());
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
index a0eda1ab4ef..89ad090efb5 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
@@ -147,7 +147,8 @@ private void dropPartitions(boolean isRepl) throws
HiveException {
DDLUtils.addIfAbsentByName(new WriteEntity(partition,
WriteEntity.WriteType.DDL_NO_LOCK), context);
if (llapEvictRequestBuilder != null) {
- llapEvictRequestBuilder.addPartitionOfATable(tableName.getDb(),
tableName.getTable(), partition.getSpec());
+ llapEvictRequestBuilder.addPartitionOfATable(
+ tableName.getCat(), tableName.getDb(), tableName.getTable(),
partition.getSpec());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a29e532b113..f58fafc8556 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -770,8 +770,7 @@ public static TableDesc getTableDesc(Table tbl) {
if (tbl.getSnapshotRef() != null) {
props.put(SNAPSHOT_REF, tbl.getSnapshotRef());
}
- return (new TableDesc(tbl.getInputFormatClass(), tbl
- .getOutputFormatClass(), props));
+ return new TableDesc(tbl.getInputFormatClass(),
tbl.getOutputFormatClass(), props, tbl.getCatName());
}
// column names and column types are all delimited by comma
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index d46eded36e6..9248897bf74 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -275,7 +275,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf,
OrcSplit orcSplit, Reporte
}
PartitionDesc partitionDesc =
LlapHiveUtils.partitionDescForPath(orcSplit.getPath(),
mapWork.getPathToPartitionInfo());
- cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(orcSplit.getPath(),
true, partitionDesc);
+ cacheTag = LlapHiveUtils.getCacheTag(orcSplit.getPath(), true,
partitionDesc);
} else {
cacheTag = null;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index fbffab44a64..c346331325d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -343,7 +343,7 @@ public static CacheTag cacheTagOfParquetFile(Path path,
Configuration cacheConf,
return null;
}
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path,
mapWork.getPathToPartitionInfo());
- return LlapHiveUtils.getDbAndTableNameForMetrics(path, true,
partitionDesc);
+ return LlapHiveUtils.getCacheTag(path, true, partitionDesc);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 0dcfe72d7f5..25f8afe6d6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -258,6 +258,13 @@ public String getTableName() {
return tableName;
}
+ /**
+ * Returns the catalog name for this partition's table.
+ */
+ public String getCatalogName() {
+ return tableDesc.getCatalogName();
+ }
+
@Explain(displayName = "input format", explainLevels = { Level.USER,
Level.DEFAULT, Level.EXTENDED })
public String getInputFileFormatClassName() {
return getInputFileFormatClass().getName();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 58ce207c0c6..b0e6feff7ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -52,6 +53,8 @@ public class TableDesc implements Serializable, Cloneable {
public static final String SECRET_PREFIX = "TABLE_SECRET";
public static final String SECRET_DELIMIT = "#";
+ private String catalogName;
+
public TableDesc() {
}
@@ -59,14 +62,31 @@ public TableDesc() {
* @param inputFormatClass
* @param outputFormatClass
* @param properties must contain serde class name associate with this table.
+ * @param catalogName the catalog this table belongs to; stored as a
dedicated field so it does
+ * not appear in EXPLAIN output. Pass {@code null} for
internal/intermediate
+ * descriptors that are not backed by a real user table;
{@code null} will be
+ * normalized to {@link Warehouse#DEFAULT_CATALOG_NAME}.
*/
public TableDesc(
final Class<? extends InputFormat> inputFormatClass,
- final Class<?> outputFormatClass, final Properties properties) {
+ final Class<?> outputFormatClass, final Properties properties,
+ final String catalogName) {
this.inputFileFormatClass = inputFormatClass;
outputFileFormatClass = HiveFileFormatUtils
.getOutputFormatSubstitute(outputFormatClass);
setProperties(properties);
+ this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME :
catalogName;
+ }
+
+ /**
+ * @param inputFormatClass
+ * @param outputFormatClass
+ * @param properties must contain serde class name associate with this table.
+ */
+ public TableDesc(
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<?> outputFormatClass, final Properties properties) {
+ this(inputFormatClass, outputFormatClass, properties, null);
}
public Class<? extends AbstractSerDe> getSerDeClass() {
@@ -199,6 +219,14 @@ public int getBucketingVersion() {
properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION));
}
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ public void setCatalogName(String catalogName) {
+ this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME :
catalogName;
+ }
+
@Override
public Object clone() {
TableDesc ret = new TableDesc();
@@ -215,6 +243,7 @@ public Object clone() {
if (jobProperties != null) {
ret.jobProperties = new LinkedHashMap<String, String>(jobProperties);
}
+ ret.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME :
catalogName;
return ret;
}
diff --git a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
index 257d69751fd..3dc0c6bbc39 100644
--- a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
+++ b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out
@@ -159,6 +159,7 @@ STAGE DEPENDENCIES:
STAGE PLANS:
Stage: Stage-0
Drop Database
+ catalog: hive
database: d
if exists: false
diff --git
a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
index 0f5d7b91516..66ef19e1d35 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -30,17 +30,19 @@
/**
* Used for identifying the related object of the buffer stored in cache.
* Comes in 3 flavours to optimize for minimal memory overhead:
- * - TableCacheTag for tables without partitions: DB/table level
- * - SinglePartitionCacheTag for tables with 1 partition level:
DB/table/1st_partition
+ * - TableCacheTag for tables without partitions: catalog.DB.table level
+ * - SinglePartitionCacheTag for tables with 1 partition level:
catalog.DB.table/1st_partition
* - MultiPartitionCacheTag for tables with > 1 partition levels:
- * DB/table/1st_partition/.../nth_partition .
+ * catalog.DB.table/1st_partition/.../nth_partition .
*/
public abstract class CacheTag implements Comparable<CacheTag> {
private static final String ENCODING = "UTF-8";
/**
- * Prepended by DB name and '.' .
+ * Catalog-qualified, DB-qualified table name. Stored as {@code
catalog.db.table}, e.g.
+ * {@code hive.salesdb.orders}. For DB-level parent tags produced by
+ * {@link #createParentCacheTag} this is just {@code catalog.db}.
*/
protected final String tableName;
@@ -48,6 +50,9 @@ private CacheTag(String tableName) {
this.tableName = tableName.intern();
}
+ /**
+ * Returns the full catalog-qualified, DB-qualified name, i.e. {@code
catalog.db.table}.
+ */
public String getTableName() {
return tableName;
}
@@ -71,8 +76,7 @@ public boolean equals(Object obj) {
@Override
public int hashCode() {
- int res = tableName.hashCode();
- return res;
+ return tableName.hashCode();
}
public static final CacheTag build(String tableName) {
@@ -82,8 +86,16 @@ public static final CacheTag build(String tableName) {
return new TableCacheTag(tableName);
}
- public static final CacheTag build(String tableName, LinkedHashMap<String,
String> partDescMap) {
- if (StringUtils.isEmpty(tableName) || partDescMap == null ||
partDescMap.isEmpty()) {
+ public static final CacheTag build(String catalogName, String
dbAndTableName) {
+ return build(catalogName + "." + dbAndTableName);
+ }
+
+ public static final CacheTag build(String catalogName, String
dbAndTableName, LinkedHashMap<String, String> partDescMap) {
+ return build(catalogName + "." + dbAndTableName, partDescMap);
+ }
+
+ public static final CacheTag build(String fullTableName,
LinkedHashMap<String, String> partDescMap) {
+ if (StringUtils.isEmpty(fullTableName) || partDescMap == null ||
partDescMap.isEmpty()) {
throw new IllegalArgumentException();
}
@@ -95,10 +107,10 @@ public static final CacheTag build(String tableName,
LinkedHashMap<String, Strin
}
if (partDescs.length == 1) {
- return new SinglePartitionCacheTag(tableName, partDescs[0]);
+ return new SinglePartitionCacheTag(fullTableName, partDescs[0]);
} else {
// In this case it must be >1
- return new MultiPartitionCacheTag(tableName, partDescs);
+ return new MultiPartitionCacheTag(fullTableName, partDescs);
}
}
@@ -118,7 +130,10 @@ public static final CacheTag build(String tableName,
List<String> partDescs) {
/**
* Constructs a (fake) parent CacheTag instance by walking back in the
hierarchy i.e. stepping
* from inner to outer partition levels, then producing a CacheTag for the
table and finally
- * the DB.
+ * the DB. The catalog prefix is preserved throughout the walk.
+ *
+ * <p>The walk terminates at the DB level: a tag whose {@code tableName}
contains exactly one
+ * dot (i.e. {@code catalog.db}) has no parent, so {@code null} is returned.
*/
public static final CacheTag createParentCacheTag(CacheTag tag) {
if (tag == null) {
@@ -134,20 +149,18 @@ public static final CacheTag
createParentCacheTag(CacheTag tag) {
}
return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName,
subList);
} else {
- return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
- multiPartitionCacheTag.partitionDesc[0]);
+ return new SinglePartitionCacheTag(
+ multiPartitionCacheTag.tableName,
multiPartitionCacheTag.partitionDesc[0]);
}
}
if (tag instanceof SinglePartitionCacheTag) {
return new TableCacheTag(tag.tableName);
} else {
- // DB level
- int ix = tag.tableName.indexOf(".");
- if (ix <= 0) {
+ if (tag.tableName.split("\\.", 3).length < 3) {
return null;
}
- return new TableCacheTag(tag.tableName.substring(0, ix));
+ return new TableCacheTag(tag.tableName.substring(0,
tag.tableName.lastIndexOf('.')));
}
}
@@ -381,4 +394,3 @@ private static String[] decodePartDesc(String partDesc) {
}
}
-