[
https://issues.apache.org/jira/browse/HIVE-29632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vikram Ahuja updated HIVE-29632:
--------------------------------
Description:
{{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query
qualifies for fetch task optimization
({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads
the entire result set into a {{List}} in JVM heap before serving any rows to
the client. On a non-ACID table with no LIMIT clause, this attempts to load all
the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough to
fill the heap.
The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does
not protect against this because it compares compressed on-disk bytes against
the threshold, not JVM heap cost after deserialization. An ORC/Parquet file of
150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner
race conditions on transactional (ACID) tables, where files can be deleted
mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the Cleaner
does not operate on them, yet it is applied unconditionally to all table types.
*What hive.fetch.task.caching=true does*
When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the
{{{}FetchTask{}}}. {{Driver}} calls {{{}fetchTask.execute(){}}},
which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
public int execute() {
if (cachingEnabled)
{ executeInner(fetchedData); // loads ALL rows before serving any }
return 0;
}
private boolean executeInner(List target) {
int rowsRet;
if (cachingEnabled)
{ rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
}
// ...
while (sink.getNumRows() < rowsRet)
{ fetch.pushRow(); // reads every row from HDFS into fetchedData }
} {code}
Each row is serialized to a tab-separated {{java.lang.String}} by
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData:
ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE,
columnar layout, block compression) are discarded. Repeated values (e.g. a
country code column with 10 distinct values across 40M rows) become 40M
separate {{String}} objects with no sharing.
*Memory amplification*
||Representation||Size||
|ORC/Parquet compressed on disk|~150 MB|
|Decompressed raw bytes|~3 GB|
|Java String objects in heap (tab-separated, no compression, no dictionary
sharing)|~34 GB|
The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than
the fully-deserialized result set.
*Steps to Reproduce*
{code:java}
– Low-cardinality Parquet table (high compression ratio is essential to stay
under threshold)
CREATE TABLE transactions (
txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
txn_dt STRING, ctry_cd STRING, prod_cd STRING,
status_cd STRING, channel_cd STRING, proc_cd STRING
)
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY");
{code}
Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant IDs,
4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows.
Insert 4 times to reach 16M rows (~120 MB on disk total).
{code:java}
– Verify file size stays under 200MB threshold (caching will be enabled)
– hdfs dfs -du -s -h /warehouse/.../transactions/
– Expected: ~120 MB
– Trigger OOM
SELECT * FROM transactions;{code}
*HiveServer2 configuration*
-Xmx6g
hive.fetch.task.conversion=more (default in Hive 4)
hive.fetch.task.caching=true (default in Hive 4)
hive.fetch.task.conversion.threshold=209715200 (200MB default)
*Observed GC pattern before crash*
[GC pause (G1 Evacuation Pause)] heap: 2048M->2040M(6144M)
[GC pause (G1 Evacuation Pause)] heap: 4096M->4090M(6144M)
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
java.lang.OutOfMemoryError: Java heap space
*Actual Behavior*
- {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before
returning any rows
- Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes
freed
- HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
*Fix*
The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable
caching for non-transactional tables:
{code:java}
// SimpleFetchOptimizer.java
boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
LOG.debug("Fetch task caching is enabled but table {} is not transactional.
" +
"Caching is only supported for ACID tables to prevent Cleaner race
conditions. Disabling.",
fetch.table.getCompleteName());
cachingEnabled = false;
}
fetchTask.setCachingEnabled(cachingEnabled); {code}
This preserves the original HIVE-25976 intent (caching for ACID tables) while
eliminating the OOM risk for all other table types.
*Additional issues not addressed by this fix (follow-up work)*
This issue will also exist for ACID tables and the threshold uses compressed
disk size — {{checkThresholdWithMetastoreStats()}} should use {{RAW_DATA_SIZE}}
or {{numRows}} × estimated row size instead of {{TOTAL_SIZE}}
Environment
- Hive 4.0.x
- Java 17
- G1GC
- Feature introduced in HIVE-25976
was:
{{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query
qualifies for fetch task optimization
({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads
the entire result set into a {{List}} in JVM heap before serving any rows to
the client. On a non-ACID table with no LIMIT clause, this attempts to load all
the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough to
fill the heap.
The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does
not protect against this because it compares compressed on-disk bytes against
the threshold, not JVM heap cost after deserialization. An ORC/Parquet file of
150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner
race conditions on transactional (ACID) tables, where files can be deleted
mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the Cleaner
does not operate on them, yet it is applied unconditionally to all table types.
*What hive.fetch.task.caching=true does*
When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the
{{{}FetchTask{}}}. {{Driver}} calls {{{}fetchTask.execute(){}}},
which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
public int execute() {
if (cachingEnabled)
{ executeInner(fetchedData); // loads ALL rows before serving any }
return 0;
}
private boolean executeInner(List target) {
int rowsRet;
if (cachingEnabled)
{ rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
}
// ...
while (sink.getNumRows() < rowsRet)
{ fetch.pushRow(); // reads every row from HDFS into fetchedData }
} {code}
Each row is serialized to a tab-separated {{java.lang.String}} by
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData:
ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE,
columnar layout, block compression) are discarded. Repeated values (e.g. a
country code column with 10 distinct values across 40M rows) become 40M
separate {{String}} objects with no sharing.
*Memory amplification*
||Representation||Size||
|ORC/Parquet compressed on disk|~150 MB|
|Decompressed raw bytes|~3 GB|
|Java String objects in heap (tab-separated, no compression, no dictionary
sharing)|~34 GB|
The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than
the fully-deserialized result set.
*Steps to Reproduce*
{code:java}
– Low-cardinality Parquet table (high compression ratio is essential to stay
under threshold)
CREATE TABLE transactions (
txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
txn_dt STRING, ctry_cd STRING, prod_cd STRING,
status_cd STRING, channel_cd STRING, proc_cd STRING
)
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY");
{code}
Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant IDs,
4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows.
Insert 4 times to reach 16M rows (~120 MB on disk total).
{code:java}
– Verify file size stays under 200MB threshold (caching will be enabled)
– hdfs dfs -du -s -h /warehouse/.../transactions/
– Expected: ~120 MB
– Trigger OOM
SELECT * FROM transactions;{code}
*HiveServer2 configuration*
-Xmx6g
hive.fetch.task.conversion=more
hive.fetch.task.caching=true (default in Hive 4)
hive.fetch.task.conversion.threshold=209715200 (200MB default)
*Observed GC pattern before crash*
[GC pause (G1 Evacuation Pause)] heap: 2048M->2040M(6144M)
[GC pause (G1 Evacuation Pause)] heap: 4096M->4090M(6144M)
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
[Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
java.lang.OutOfMemoryError: Java heap space
*Expected Behavior*
- {{SELECT * FROM non_acid_table}} completes successfully, streaming rows
batch-by-batch to the client
- Heap usage remains bounded during fetch; GC can reclaim memory between
queries
- {{hive.fetch.task.caching}} only affects transactional (ACID) tables, which
are the only table type for which the Cleaner race condition exists
*Actual Behavior*
- {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before
returning any rows
- Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes
freed
- HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
*Fix*
The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable
caching for non-transactional tables:
{code:java}
// SimpleFetchOptimizer.java
boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
LOG.debug("Fetch task caching is enabled but table {} is not transactional.
" +
"Caching is only supported for ACID tables to prevent Cleaner race
conditions. Disabling.",
fetch.table.getCompleteName());
cachingEnabled = false;
}
fetchTask.setCachingEnabled(cachingEnabled); {code}
This preserves the original HIVE-25976 intent (caching for ACID tables) while
eliminating the OOM risk for all other table types.
*Additional issues not addressed by this fix (follow-up work)*
Threshold uses compressed disk size — {{checkThresholdWithMetastoreStats()}}
should use {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of
{{TOTAL_SIZE}}
Environment
- Hive 4.0.x
- Java 17
- G1GC
- Feature introduced in HIVE-25976
> hive.fetch.task.caching=true (default) causes unbounded heap allocation on
> non-ACID tables, crashing HiveServer2 with OutOfMemoryError
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: HIVE-29632
> URL: https://issues.apache.org/jira/browse/HIVE-29632
> Project: Hive
> Issue Type: Improvement
> Affects Versions: 4.0.0, 4.0.1
> Reporter: Vikram Ahuja
> Assignee: Vikram Ahuja
> Priority: Critical
>
> {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query
> qualifies for fetch task optimization
> ({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads
> the entire result set into a {{List}} in JVM heap before serving any rows to
> the client. On a non-ACID table with no LIMIT clause, this attempts to load
> all the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough
> to fill the heap.
>
> The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does
> not protect against this because it compares compressed on-disk bytes against
> the threshold, not JVM heap cost after deserialization. An ORC/Parquet file
> of 150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
>
> The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner
> race conditions on transactional (ACID) tables, where files can be deleted
> mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the
> Cleaner does not operate on them, yet it is applied unconditionally to all
> table types.
>
> *What hive.fetch.task.caching=true does*
> When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the
> {{{}FetchTask{}}}. {{Driver}} calls {{{}fetchTask.execute(){}}},
> which calls {{executeInner(fetchedData)}} with:
> {code:java}
> // FetchTask.java
> public int execute() {
> if (cachingEnabled)
> { executeInner(fetchedData); // loads ALL rows before serving any }
> return 0;
> }
> private boolean executeInner(List target) {
> int rowsRet;
> if (cachingEnabled)
> { rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
> }
> // ...
> while (sink.getNumRows() < rowsRet)
> { fetch.pushRow(); // reads every row from HDFS into fetchedData }
> } {code}
> Each row is serialized to a tab-separated {{java.lang.String}} by
> {{DefaultFetchFormatter}} before being stored in {{{}fetchedData:
> ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE,
> columnar layout, block compression) are discarded. Repeated values (e.g. a
> country code column with 10 distinct values across 40M rows) become 40M
> separate {{String}} objects with no sharing.
> *Memory amplification*
> ||Representation||Size||
> |ORC/Parquet compressed on disk|~150 MB|
> |Decompressed raw bytes|~3 GB|
> |Java String objects in heap (tab-separated, no compression, no dictionary
> sharing)|~34 GB|
> The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller
> than the fully-deserialized result set.
>
> *Steps to Reproduce*
> {code:java}
> – Low-cardinality Parquet table (high compression ratio is essential to
> stay under threshold)
> CREATE TABLE transactions (
> txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
> txn_dt STRING, ctry_cd STRING, prod_cd STRING,
> status_cd STRING, channel_cd STRING, proc_cd STRING
> )
> STORED AS PARQUET
> TBLPROPERTIES ("parquet.compression"="SNAPPY");
> {code}
> Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant
> IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M
> rows. Insert 4 times to reach 16M rows (~120 MB on disk total).
> {code:java}
> – Verify file size stays under 200MB threshold (caching will be enabled)
> – hdfs dfs -du -s -h /warehouse/.../transactions/
> – Expected: ~120 MB
> – Trigger OOM
> SELECT * FROM transactions;{code}
>
> *HiveServer2 configuration*
> -Xmx6g
> hive.fetch.task.conversion=more (default in Hive 4)
> hive.fetch.task.caching=true (default in Hive 4)
> hive.fetch.task.conversion.threshold=209715200 (200MB default)
> *Observed GC pattern before crash*
> [GC pause (G1 Evacuation Pause)] heap: 2048M->2040M(6144M)
> [GC pause (G1 Evacuation Pause)] heap: 4096M->4090M(6144M)
> [Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
> [Full GC (Allocation Failure)] heap: 6140M->6140M(6144M) <- zero freed
> java.lang.OutOfMemoryError: Java heap space
> *Actual Behavior*
> - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before
> returning any rows
> - Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes
> freed
> - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
> *Fix*
> The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to
> disable caching for non-transactional tables:
> {code:java}
> // SimpleFetchOptimizer.java
> boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
> HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
> if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
> LOG.debug("Fetch task caching is enabled but table {} is not
> transactional. " +
> "Caching is only supported for ACID tables to prevent Cleaner race
> conditions. Disabling.",
> fetch.table.getCompleteName());
> cachingEnabled = false;
> }
> fetchTask.setCachingEnabled(cachingEnabled); {code}
> This preserves the original HIVE-25976 intent (caching for ACID tables)
> while eliminating the OOM risk for all other table types.
> *Additional issues not addressed by this fix (follow-up work)*
> This issue will also exist for ACID tables and the threshold uses
> compressed disk size — {{checkThresholdWithMetastoreStats()}} should use
> {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of
> {{TOTAL_SIZE}}
> Environment
> - Hive 4.0.x
> - Java 17
> - G1GC
> - Feature introduced in HIVE-25976
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)