Vikram Ahuja created HIVE-29632:
-----------------------------------
Summary: hive.fetch.task.caching=true (default) causes unbounded
heap allocation on non-ACID tables, crashing HiveServer2 with OutOfMemoryError
Key: HIVE-29632
URL: https://issues.apache.org/jira/browse/HIVE-29632
Project: Hive
Issue Type: Improvement
Affects Versions: 4.0.1, 4.0.0
Reporter: Vikram Ahuja
Assignee: Vikram Ahuja
\{{hive.fetch.task.caching}} defaults to \{{true}} in Hive 4. When a query
qualifies for fetch task optimization (\{{hive.fetch.task.conversion=more}}),
\{{FetchTask.execute()}} pre-loads the entire result set into a \{{List}} in
JVM heap before serving any rows to the client. On a non-ACID table with no
LIMIT clause, this
attempts to load all the rows(INT_MAX), causing \{{OutOfMemoryError}} on any
table large enough to fill the heap.
The threshold guard (\{{hive.fetch.task.conversion.threshold=200MB}}) does
not protect against this because it compares compressed on-disk bytes against
the threshold,
not JVM heap cost after deserialization. An ORC/Parquet file of 150 MB on
disk can expand to 30+ GB of Java \{{String}} objects in heap.
The feature was introduced in
[HIVE-25976|https://issues.apache.org/jira/browse/HIVE-25976] specifically to
prevent Hive Cleaner race conditions on transactional (ACID)
tables, where files can be deleted mid-fetch by the Cleaner. It has no
benefit for non-ACID tables as the Cleaner does not operate on them, yet it is
applied
unconditionally to all table types.
h3. What hive.fetch.task.caching=true does
When enabled, \{{SimpleFetchOptimizer}} sets \{{cachingEnabled=true}} on the
\{{FetchTask}}. After Tez/MR execution completes, \{{Driver}} calls
\{{fetchTask.execute()}},
which calls \{{executeInner(fetchedData)}} with:
\{code:java}
// FetchTask.java
public int execute() {
if (cachingEnabled) {
executeInner(fetchedData); // loads ALL rows before serving any
}
return 0;
}
private boolean executeInner(List target) {
int rowsRet;
if (cachingEnabled) {
rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
}
// ...
while (sink.getNumRows() < rowsRet) {
fetch.pushRow(); // reads every row from HDFS into fetchedData
}
}
\{code}
Each row is serialized to a tab-separated \{{java.lang.String}} by
\{{DefaultFetchFormatter}} before being stored in \{{fetchedData: ArrayList}}.
All ORC/Parquet
optimizations (dictionary encoding, RLE, columnar layout, block compression)
are discarded. Repeated values (e.g. a country code column with 10 distinct
values across
40M rows) become 40M separate \{{String}} objects with no sharing.
h3. Memory amplification
||Representation||Size||
|ORC/Parquet compressed on disk|~150 MB|
|Decompressed raw bytes|~3 GB|
|Java String objects in heap (tab-separated, no compression, no dictionary
sharing)|~34 GB|
The ~200x amplification causes \{{OutOfMemoryError}} on any heap smaller than
the fully-deserialized result set.
h3. Broken threshold guard
\{{SimpleFetchOptimizer.checkThresholdWithMetastoreStats()}} uses
\{{StatsSetupConst.TOTAL_SIZE}} from HMS stats, which is the compressed file
size on disk:
\{code:java}
// SimpleFetchOptimizer.java - FetchData.checkThresholdWithMetastoreStats()
long dataSize = StatsUtils.getTotalSize(table); // compressed bytes!
status = (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL;
\{code}
A 150 MB ORC file passes the default 200 MB threshold check, caching is
enabled, and 34 GB of String objects flood the heap. The \{{HiveConf}} javadoc
for
\{{HIVE_FETCH_TASK_CACHING}} acknowledges this: "the
hive.fetch.task.conversion.threshold must be adjusted accordingly. That is
200MB by default which must be lowered in
case of enabled caching" — but this is never enforced in code.
h3. Retention amplifies impact
With \{{hive.server2.idle.operation.timeout=2h}} (default), unclosed JDBC
operations retain \{{fetchedData}} in heap for up to 2 hours. Multiple
concurrent large queries
cause additive pressure. There is no soft/weak reference, no memory-pressure
eviction, and no size cap on \{{fetchedData}}.
h3. Why non-ACID tables have no need for caching
The feature was designed for transactional tables only. The Hive Cleaner
compacts delta files and deletes old base/delta files for a slow JDBC client
fetching 10K rows
at a time over minutes, the Cleaner can delete files mid-fetch, causing
\{{FileNotFoundException}}. Pre-loading into RAM solves this.
For non-ACID tables (managed, external, ORC, Parquet, Iceberg), the Cleaner
never runs. There is no race condition to prevent. Caching on these tables
provides zero
benefit while introducing unbounded heap allocation.
---h2. Steps to Reproduce
h3. Setup
\{code:sql}
-- Low-cardinality Parquet table (high compression ratio is essential to stay
under threshold)
CREATE TABLE transactions (
txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
txn_dt STRING, ctry_cd STRING, prod_cd STRING,
status_cd STRING, channel_cd STRING, proc_cd STRING
)
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY");
\{code}
Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant
IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows.
Insert 4 times
to reach 16M rows (~120 MB on disk total).
\{code:sql}
-- Verify file size stays under 200MB threshold (caching will be enabled)
-- hdfs dfs -du -s -h /warehouse/.../transactions/
-- Expected: ~120 MB
-- Trigger OOM
SELECT * FROM transactions;
\{code}
h3. HiveServer2 configuration
\{noformat}
-Xmx6g
hive.fetch.task.conversion=more
hive.fetch.task.caching=true (default in Hive 4)
hive.fetch.task.conversion.threshold=209715200 (200MB default)
\{noformat}
h3. Observed GC pattern before crash
\{noformat}
[GC pause (G1 Evacuation Pause)] heap: 2048M->2040M(6144M)
[GC pause (G1 Evacuation Pause)] heap: 4096M->4090M(6144M)
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
java.lang.OutOfMemoryError: Java heap space
\{noformat}
---h2. Expected Behavior
- \{{SELECT * FROM non_acid_table}} completes successfully, streaming rows
batch-by-batch to the client
- Heap usage remains bounded during fetch; GC can reclaim memory between
queries
- \{{hive.fetch.task.caching}} only affects transactional (ACID) tables,
which are the only table type for which the Cleaner race condition exists
h2. Actual Behavior
- \{{FetchTask}} pre-loads all rows into \{{fetchedData: ArrayList}} before
returning any rows
- Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes
freed
- HiveServer2 crashes with \{{java.lang.OutOfMemoryError: Java heap space}}
---h2. Fix
The fix is a one-line guard in \{{SimpleFetchOptimizer.optimize()}} to
disable caching for non-transactional tables:
\{code:java}
// SimpleFetchOptimizer.java
boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
LOG.debug("Fetch task caching is enabled but table {} is not transactional.
" +
"Caching is only supported for ACID tables to prevent Cleaner race
conditions. Disabling.",
fetch.table.getCompleteName());
cachingEnabled = false;
}
fetchTask.setCachingEnabled(cachingEnabled);
\{code}
This preserves the original HIVE-25976 intent (caching for ACID tables) while
eliminating the OOM risk for all other table types.
h3. Additional issues not addressed by this fix (follow-up work)
Threshold uses compressed disk size — \{{checkThresholdWithMetastoreStats()}}
should use \{{RAW_DATA_SIZE}} or \{{numRows}} × estimated row size instead of
\{{TOTAL_SIZE}}
No LIMIT guard — when caching is enabled and \{{work.getLimit() < 0}},
\{{rowsRet = Integer.MAX_VALUE}} should fall back to streaming
Default should be false — \{{HIVE_FETCH_TASK_CACHING}} defaults to \{{true}};
it should default to \{{false}} and require explicit opt-in
---h2. Environment
- Hive 4.0.x
- Java 17
- G1GC
- Feature introduced in HIVE-25976
h2. Workaround
Set in \{{hive-site.xml}} and restart HiveServer2:
\{code:xml}
hive.fetch.task.caching
false
\{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)