[ 
https://issues.apache.org/jira/browse/HIVE-29632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikram Ahuja updated HIVE-29632:
--------------------------------
    Description: 
  {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
qualifies for fetch task optimization 
({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
the entire result set into a {{List}} in JVM heap before serving any rows to 
the client. On a non-ACID table with no LIMIT clause, this attempts to load all 
the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough to 
fill the heap.

 

The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
not protect against this because it compares compressed on-disk bytes against 
the threshold, not JVM heap cost after deserialization. An ORC/Parquet file of 
150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.

 

The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
race conditions on transactional (ACID) tables, where files can be deleted 
mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the Cleaner 
does not operate on them, yet it is applied unconditionally to all table types.

 

 *What hive.fetch.task.caching=true does*

 When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
{{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
 which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
  public int execute() {
    if (cachingEnabled)
{       executeInner(fetchedData);  // loads ALL rows before serving any     }
    return 0;
  }
  private boolean executeInner(List target) {
    int rowsRet;
    if (cachingEnabled)
{       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;   
  }
    // ...
    while (sink.getNumRows() < rowsRet)
{       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
  } {code}
 Each row is serialized to a tab-separated {{java.lang.String}} by 
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
columnar layout, block compression) are discarded. Repeated values (e.g. a 
country code column with 10 distinct values across 40M rows) become 40M 
separate {{String}} objects with no sharing.

 *Memory amplification*

  ||Representation||Size||
  |ORC/Parquet compressed on disk|~150 MB|
  |Decompressed raw bytes|~3 GB|
  |Java String objects in heap (tab-separated, no compression, no dictionary 
sharing)|~34 GB|

  The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than 
the fully-deserialized result set.

 

*Steps to Reproduce*
{code:java}
  – Low-cardinality Parquet table (high compression ratio is essential to stay 
under threshold)
  CREATE TABLE transactions (
    txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
    txn_dt STRING, ctry_cd STRING, prod_cd STRING,
    status_cd STRING, channel_cd STRING, proc_cd STRING
  )
  STORED AS PARQUET
  TBLPROPERTIES ("parquet.compression"="SNAPPY");
  {code}
Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant IDs, 
4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows. 
Insert 4 times to reach 16M rows (~120 MB on disk total).
{code:java}
  – Verify file size stays under 200MB threshold (caching will be enabled)
  – hdfs dfs -du -s -h /warehouse/.../transactions/
  – Expected: ~120 MB
  – Trigger OOM
  SELECT * FROM transactions;{code}
 

 *HiveServer2 configuration*

  -Xmx6g
  hive.fetch.task.conversion=more
  hive.fetch.task.caching=true          (default in Hive 4)
  hive.fetch.task.conversion.threshold=209715200  (200MB default)

*Observed GC pattern before crash*

  [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
  [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  java.lang.OutOfMemoryError: Java heap space

 *Expected Behavior*

  - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
batch-by-batch to the client
  - Heap usage remains bounded during fetch; GC can reclaim memory between 
queries
  - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, which 
are the only table type for which the Cleaner race condition exists

 *Actual Behavior*

  - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
returning any rows
  - Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes 
freed
  - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}

 *Fix*

  The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable 
caching for non-transactional tables:
{code:java}
  // SimpleFetchOptimizer.java
  boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
      HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
  if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
    LOG.debug("Fetch task caching is enabled but table {} is not transactional. 
" +
        "Caching is only supported for ACID tables to prevent Cleaner race 
conditions. Disabling.",
        fetch.table.getCompleteName());
    cachingEnabled = false;
  }
  fetchTask.setCachingEnabled(cachingEnabled); {code}
  This preserves the original HIVE-25976 intent (caching for ACID tables) while 
eliminating the OOM risk for all other table types.

 Additional issues not addressed by this fix (follow-up work)

  Threshold uses compressed disk size — {{checkThresholdWithMetastoreStats()}} 
should use {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of 
{{TOTAL_SIZE}}

Environment

  - Hive 4.0.x
  - Java 17
  - G1GC
  - Feature introduced in HIVE-25976

 

  was:
  {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
qualifies for fetch task optimization 
({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
the entire result set into a {{List}} in JVM heap before serving any rows to 
the client. On a non-ACID table with no LIMIT clause, this attempts to load all 
the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough to 
fill the heap.

 

The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
not protect against this because it compares compressed on-disk bytes against 
the threshold, not JVM heap cost after deserialization. An ORC/Parquet file of 
150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.

 

The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
race conditions on transactional (ACID) tables, where files can be deleted 
mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the Cleaner 
does not operate on them, yet it is applied unconditionally to all table types.

 

 *What hive.fetch.task.caching=true does*

 When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
{{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
 which calls {{executeInner(fetchedData)}} with:
{code:java}
// FetchTask.java
  public int execute() {
    if (cachingEnabled)
{       executeInner(fetchedData);  // loads ALL rows before serving any     }
    return 0;
  }
  private boolean executeInner(List target) {
    int rowsRet;
    if (cachingEnabled)
{       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;   
  }
    // ...
    while (sink.getNumRows() < rowsRet)
{       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
  } {code}
 Each row is serialized to a tab-separated {{java.lang.String}} by 
{{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
columnar layout, block compression) are discarded. Repeated values (e.g. a 
country code column with 10 distinct values across 40M rows) become 40M 
separate {{String}} objects with no sharing.

 *Memory amplification*

  ||Representation||Size||
  |ORC/Parquet compressed on disk|~150 MB|
  |Decompressed raw bytes|~3 GB|
  |Java String objects in heap (tab-separated, no compression, no dictionary 
sharing)|~34 GB|

  The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller than 
the fully-deserialized result set.

 

*Steps to Reproduce*
{code:java}
  – Low-cardinality Parquet table (high compression ratio is essential to stay 
under threshold)
  CREATE TABLE transactions (
    txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
    txn_dt STRING, ctry_cd STRING, prod_cd STRING,
    status_cd STRING, channel_cd STRING, proc_cd STRING
  )
  STORED AS PARQUET
  TBLPROPERTIES ("parquet.compression"="SNAPPY");
  {code}
Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant IDs, 
4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows. 
Insert 4 times to reach 16M rows (~120 MB on disk total).
{code:java}
  – Verify file size stays under 200MB threshold (caching will be enabled)
  – hdfs dfs -du -s -h /warehouse/.../transactions/
  – Expected: ~120 MB
  – Trigger OOM
  SELECT * FROM transactions;{code}
 

 HiveServer2 configuration

  -Xmx6g
  hive.fetch.task.conversion=more
  hive.fetch.task.caching=true          (default in Hive 4)
  hive.fetch.task.conversion.threshold=209715200  (200MB default)



Observed GC pattern before crash

  [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
  [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  java.lang.OutOfMemoryError: Java heap space



 Expected Behavior

  - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
batch-by-batch to the client
  - Heap usage remains bounded during fetch; GC can reclaim memory between 
queries
  - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, which 
are the only table type for which the Cleaner race condition exists

 Actual Behavior

  - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
returning any rows
  - Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes 
freed
  - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}

 Fix

  The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to disable 
caching for non-transactional tables:

  \{code:java}
  // SimpleFetchOptimizer.java
  boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
      HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
  if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
    LOG.debug("Fetch task caching is enabled but table {} is not transactional. 
" +
        "Caching is only supported for ACID tables to prevent Cleaner race 
conditions. Disabling.",
        fetch.table.getCompleteName());
    cachingEnabled = false;
  }
  fetchTask.setCachingEnabled(cachingEnabled);
  \{code}

  This preserves the original HIVE-25976 intent (caching for ACID tables) while 
eliminating the OOM risk for all other table types.

  h3. Additional issues not addressed by this fix (follow-up work)

  Threshold uses compressed disk size — {{checkThresholdWithMetastoreStats()}} 
should use {{RAW_DATA_SIZE}} or {{numRows}} × estimated row size instead of 
{{TOTAL_SIZE}}

Environment

  - Hive 4.0.x
  - Java 17
  - G1GC
  - Feature introduced in HIVE-25976

 


> hive.fetch.task.caching=true (default) causes unbounded heap allocation on 
> non-ACID tables, crashing HiveServer2 with OutOfMemoryError
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-29632
>                 URL: https://issues.apache.org/jira/browse/HIVE-29632
>             Project: Hive
>          Issue Type: Improvement
>    Affects Versions: 4.0.0, 4.0.1
>            Reporter: Vikram Ahuja
>            Assignee: Vikram Ahuja
>            Priority: Critical
>
>   {{hive.fetch.task.caching}} defaults to {{true}} in Hive 4. When a query 
> qualifies for fetch task optimization 
> ({{{}hive.fetch.task.conversion=more{}}}), {{FetchTask.execute()}} pre-loads 
> the entire result set into a {{List}} in JVM heap before serving any rows to 
> the client. On a non-ACID table with no LIMIT clause, this attempts to load 
> all the rows(INT_MAX), causing {{OutOfMemoryError}} on any table large enough 
> to fill the heap.
>  
> The threshold guard ({{{}hive.fetch.task.conversion.threshold=200MB{}}}) does 
> not protect against this because it compares compressed on-disk bytes against 
> the threshold, not JVM heap cost after deserialization. An ORC/Parquet file 
> of 150 MB on disk can expand to 30+ GB of Java {{String}} objects in heap.
>  
> The feature was introduced in HIVE-25976 specifically to prevent Hive Cleaner 
> race conditions on transactional (ACID) tables, where files can be deleted 
> mid-fetch by the Cleaner. It has no benefit for non-ACID tables as the 
> Cleaner does not operate on them, yet it is applied unconditionally to all 
> table types.
>  
>  *What hive.fetch.task.caching=true does*
>  When enabled, {{SimpleFetchOptimizer}} sets {{cachingEnabled=true}} on the 
> {{{}FetchTask{}}}.  {{Driver}} calls {{{}fetchTask.execute(){}}},
>  which calls {{executeInner(fetchedData)}} with:
> {code:java}
> // FetchTask.java
>   public int execute() {
>     if (cachingEnabled)
> {       executeInner(fetchedData);  // loads ALL rows before serving any     }
>     return 0;
>   }
>   private boolean executeInner(List target) {
>     int rowsRet;
>     if (cachingEnabled)
> {       rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE; 
>     }
>     // ...
>     while (sink.getNumRows() < rowsRet)
> {       fetch.pushRow();  // reads every row from HDFS into fetchedData     }
>   } {code}
>  Each row is serialized to a tab-separated {{java.lang.String}} by 
> {{DefaultFetchFormatter}} before being stored in {{{}fetchedData: 
> ArrayList{}}}. All ORC/Parquet optimizations (dictionary encoding, RLE, 
> columnar layout, block compression) are discarded. Repeated values (e.g. a 
> country code column with 10 distinct values across 40M rows) become 40M 
> separate {{String}} objects with no sharing.
>  *Memory amplification*
>   ||Representation||Size||
>   |ORC/Parquet compressed on disk|~150 MB|
>   |Decompressed raw bytes|~3 GB|
>   |Java String objects in heap (tab-separated, no compression, no dictionary 
> sharing)|~34 GB|
>   The ~200x amplification causes {{OutOfMemoryError}} on any heap smaller 
> than the fully-deserialized result set.
>  
> *Steps to Reproduce*
> {code:java}
>   – Low-cardinality Parquet table (high compression ratio is essential to 
> stay under threshold)
>   CREATE TABLE transactions (
>     txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
>     txn_dt STRING, ctry_cd STRING, prod_cd STRING,
>     status_cd STRING, channel_cd STRING, proc_cd STRING
>   )
>   STORED AS PARQUET
>   TBLPROPERTIES ("parquet.compression"="SNAPPY");
>   {code}
> Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
> IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M 
> rows. Insert 4 times to reach 16M rows (~120 MB on disk total).
> {code:java}
>   – Verify file size stays under 200MB threshold (caching will be enabled)
>   – hdfs dfs -du -s -h /warehouse/.../transactions/
>   – Expected: ~120 MB
>   – Trigger OOM
>   SELECT * FROM transactions;{code}
>  
>  *HiveServer2 configuration*
>   -Xmx6g
>   hive.fetch.task.conversion=more
>   hive.fetch.task.caching=true          (default in Hive 4)
>   hive.fetch.task.conversion.threshold=209715200  (200MB default)
> *Observed GC pattern before crash*
>   [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
>   [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
>   java.lang.OutOfMemoryError: Java heap space
>  *Expected Behavior*
>   - {{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
> batch-by-batch to the client
>   - Heap usage remains bounded during fetch; GC can reclaim memory between 
> queries
>   - {{hive.fetch.task.caching}} only affects transactional (ACID) tables, 
> which are the only table type for which the Cleaner race condition exists
>  *Actual Behavior*
>   - {{FetchTask}} pre-loads all rows into {{fetchedData: ArrayList}} before 
> returning any rows
>   - Heap fills to 100%, G1GC enters a death spiral of Full GC with zero bytes 
> freed
>   - HiveServer2 crashes with {{java.lang.OutOfMemoryError: Java heap space}}
>  *Fix*
>   The fix is a one-line guard in {{SimpleFetchOptimizer.optimize()}} to 
> disable caching for non-transactional tables:
> {code:java}
>   // SimpleFetchOptimizer.java
>   boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
>       HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
>   if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
>     LOG.debug("Fetch task caching is enabled but table {} is not 
> transactional. " +
>         "Caching is only supported for ACID tables to prevent Cleaner race 
> conditions. Disabling.",
>         fetch.table.getCompleteName());
>     cachingEnabled = false;
>   }
>   fetchTask.setCachingEnabled(cachingEnabled); {code}
>   This preserves the original HIVE-25976 intent (caching for ACID tables) 
> while eliminating the OOM risk for all other table types.
>  Additional issues not addressed by this fix (follow-up work)
>   Threshold uses compressed disk size — 
> {{checkThresholdWithMetastoreStats()}} should use {{RAW_DATA_SIZE}} or 
> {{numRows}} × estimated row size instead of {{TOTAL_SIZE}}
> Environment
>   - Hive 4.0.x
>   - Java 17
>   - G1GC
>   - Feature introduced in HIVE-25976
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to