andyguwc opened a new issue, #16600:
URL: https://github.com/apache/iceberg/issues/16600

   ### Summary
   
   Iceberg 1.11.0 made two changes in `iceberg-parquet` that, in combination, 
force every `S3FileIO`-backed Parquet read through parquet-hadoop's vectored 
I/O code path using a hardcoded on-heap `HeapByteBufferAllocator`. For tables 
with high compression ratios (common for JSON / wide-text columns under zstd), 
this leads to executor `java.lang.OutOfMemoryError: Java heap space` during 
`BatchScan` reads — especially during MERGE on the target side — with no 
user-facing configuration to disable the behavior from outside Iceberg source.
   
   This regression did not exist in 1.10.x: the prerequisite adapter class was 
not present, and the read-options builder did not hardcode vectored I/O on.
   
   ### Environment
   
   - Apache Iceberg: **1.11.0** (`iceberg-spark-runtime-3.5_2.12-1.11.0.jar`)
   - Spark 3.5 (AWS Glue 5.1 runtime), Java 17 (Corretto 17.0.19)
   - FileIO: `S3FileIO`, Glue Data Catalog
   - Affected workers: G.1X (4 vCPU / 16 GB), R.2X (8 vCPU / 64 GB), various 
counts
   - Affected table characteristics (observed across ≥5 production CDC tables):
     - `write.parquet.compression-codec=zstd`
     - `write.target-file-size-bytes=536870912` (512 MB)
     - default `write.parquet.row-group-size-bytes` (128 MB)
     - observed zstd compression ratios: **7.4×, 7.6×, 13.2×, 14.3×**
   
   ### Symptom
   
   `java.lang.OutOfMemoryError: Java heap space` during target-side `BatchScan` 
of a `MERGE INTO`, cascading executor losses (`Remote RPC client 
disassociated`), job aborts after 4 task retries on the same stage.
   
   Representative stack trace:
   
   ```
   java.lang.OutOfMemoryError: Java heap space
     at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
     at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
     at 
org.apache.iceberg.shaded.org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:34)
     at 
org.apache.iceberg.parquet.ParquetIO$ParquetRangeReadableInputStreamAdapter$$Lambda$.../apply(Unknown
 Source)
     at org.apache.iceberg.io.RangeReadable.readVectored(RangeReadable.java:108)
     at 
org.apache.iceberg.parquet.ParquetIO$ParquetRangeReadableInputStreamAdapter.readVectored(ParquetIO.java:180)
     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readVectored(ParquetFileReader.java:1357)
     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1274)
     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1185)
     at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1135)
     at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:161)
     ...
     at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:139)
   ```
   
   ### Root cause (line-cited against `apache-iceberg-1.11.0`)
   
   Two changes landed together in 1.11.0:
   
   **1.** `ParquetIO` introduced a new adapter that exposes parquet-hadoop's 
multi-range `readVectored` to any `SeekableInputStream` that also implements 
`RangeReadable` (which includes `S3InputStream`):
   
   
[`parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java#L141-L185`](https://github.com/apache/iceberg/blob/apache-iceberg-1.11.0/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java#L141-L185)
   
   ```java
   static class ParquetRangeReadableInputStreamAdapter<
           T extends org.apache.iceberg.io.SeekableInputStream & RangeReadable>
       extends DelegatingSeekableInputStream implements RangeReadable {
     ...
     @Override
     public void readVectored(List<ParquetFileRange> ranges, 
ByteBufferAllocator allocate)
         throws IOException {
       List<FileRange> delegateRange = convertRanges(ranges);
       delegate.readVectored(delegateRange, allocate::allocate);
     }
   }
   ```
   
   **2.** `Parquet.ReadBuilder` unconditionally enables vectored I/O when 
building `ParquetReadOptions` for the reader path:
   
   
[`parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L1486`](https://github.com/apache/iceberg/blob/apache-iceberg-1.11.0/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L1486)
   
   ```java
   optionsBuilder.withUseHadoopVectoredIo(true);
   ParquetReadOptions options = optionsBuilder.build();
   ```
   
   There is no config lookup, no `if` branch, no honoring of 
`parquet.hadoop.vectored.io.enabled`. Even when a user explicitly sets 
`spark.hadoop.parquet.hadoop.vectored.io.enabled=false` (which flows into the 
Hadoop Configuration), line 1486 overrides it back to `true`. Empirically 
confirmed by identical post-override stack traces.
   
   In combination, every `S3FileIO`-backed read going through Iceberg's 
batch/row reader path now uses parquet-hadoop's vectored read codepath. The 
Parquet vectored read allocates one `ByteBuffer` per column chunk via the 
allocator returned by `ParquetReadOptions.getAllocator()`. That allocator 
defaults to `HeapByteBufferAllocator.getInstance()` because Iceberg never calls 
`.withAllocator(...)` on the builder.
   
   For a 128 MB compressed row group on a table with 14× compression ratio, 
this single allocation pre-stages ~128 MB on-heap to be decompressed to ~1.5 GB 
working set per row group. With 4 concurrent tasks per executor on a G.1X-class 
worker (~10 GB heap), cumulative on-heap pressure overruns the JVM.
   
   ### Why this is a regression
   
   `apache-iceberg-1.10.2` for comparison:
   
   - 
[`ParquetIO.java`](https://github.com/apache/iceberg/blob/apache-iceberg-1.10.2/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java)
 does NOT contain `ParquetRangeReadableInputStreamAdapter`. Only 
`ParquetInputStreamAdapter` exists, which does not implement `RangeReadable`. 
parquet-hadoop's `f instanceof RangeReadable` check fails and 
`readAllPartsVectoredOrNormal` takes the "Normal" branch.
   - 
[`Parquet.java`](https://github.com/apache/iceberg/blob/apache-iceberg-1.10.2/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java)
 does NOT call `optionsBuilder.withUseHadoopVectoredIo(true)`.
   
   Net effect on 1.10.2 and earlier: vectored I/O was effectively inert for 
`S3FileIO`-backed reads, the on-heap allocator was never invoked for bulk 
column-chunk reads, and the OOM described above does not occur.
   
   ### Why this is impactful
   
   - **No outside-source mitigation exists.** Setting 
`spark.hadoop.parquet.hadoop.vectored.io.enabled=false` is silently overridden 
by line 1486. There is no Spark / Iceberg configuration that disables this code 
path.
   - **No allocator configuration is exposed.** `ParquetReadOptions.Builder` 
supports `.withAllocator(...)`, but Iceberg does not call it, so users cannot 
supply `DirectByteBufferAllocator` to move these buffers off-heap.
   - **The user-visible message is misleading.** `Java heap space` + executor 
losses suggests "grow your workers," but doing so just works around a 
misallocation rather than fixing it. The buffers in question are bulk I/O 
staging — they belong off-heap (alongside Spark shuffle / unified memory), not 
on-heap.
   - Production workarounds we've had to apply per affected table:
     - `spark.executor.cores=2` (reduce concurrent on-heap allocations)
     - Worker class bump (G.1X → R.1X / R.2X)
     - Reduce `write.parquet.row-group-size-bytes` from 128 MB to 64 MB or 32 
MB, then `rewrite_data_files` with `rewrite-all: true` to migrate existing files
   
   Each carries cost (compute), runtime tax (cores=2 halves slot count), or a 
one-time data-rewrite cost (row-group reduction).
   
   ### Proposed fixes
   
   Two small, complementary changes resolve the regression:
   
   **Fix A — honor the existing Parquet/Hadoop conf for vectored I/O.** Change 
[`Parquet.java#L1486`](https://github.com/apache/iceberg/blob/apache-iceberg-1.11.0/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L1486)
 from:
   
   ```java
   optionsBuilder.withUseHadoopVectoredIo(true);
   ```
   
   to either honor the standard Parquet property explicitly (default still 
`true`):
   
   ```java
   optionsBuilder.withUseHadoopVectoredIo(
       PropertyUtil.propertyAsBoolean(properties,
           ParquetReader.HADOOP_VECTORED_IO_ENABLED,  // or a new constant
           true));
   ```
   
   …or simply drop the line and let `ParquetReadOptions` resolve from the 
underlying `ParquetConfiguration` it was built with (the 
`HadoopReadOptions.builder(conf)` branch at line 1469 already passes the Hadoop 
conf through).
   
   This gives operators an escape hatch matching standard Parquet semantics.
   
   **Fix B — make the allocator configurable, default to off-heap for the 
vectored read path.** Expose a Parquet-allocator selection via an Iceberg read 
property (e.g. `read.parquet.bytes-allocator=heap|direct`) and route it into 
the builder:
   
   ```java
   ByteBufferAllocator allocator = selectAllocator(properties);
   optionsBuilder.withAllocator(allocator);
   ```
   
   Fix A alone is sufficient to unblock production users immediately. Fix B is 
the correct architectural answer — bulk Parquet column-chunk staging buffers 
belong off-heap, not on the JVM execution heap, regardless of whether vectored 
I/O is opted into.
   
   ### Reproduction
   
   Any Iceberg table satisfying:
   - Backed by `S3FileIO`
   - `write.parquet.compression-codec=zstd` (or another high-ratio codec)
   - Wide rows / JSON-ish columns (so the uncompressed working set is large)
   - ≥128 MB row groups (the default)
   
   Run a `MERGE INTO` or `BatchScan` workload on Iceberg 1.11.0 with default 
executor configuration on a worker with ≤16 GB heap. Heap OOM appears in 
`HeapByteBufferAllocator.allocate` inside 
`ParquetRangeReadableInputStreamAdapter.readVectored`.
   
   Setting `spark.hadoop.parquet.hadoop.vectored.io.enabled=false` does not 
prevent the OOM — confirmed by identical post-override stack traces.
   
   ### Versions tested
   
   | Version | Vectored I/O code path | OOM observed |
   |---|---|---|
   | 1.7.2 | inert (no `RangeReadable` adapter) | no |
   | 1.10.0 | inert | no (separate `Connection pool shut down` issue) |
   | 1.10.2 | inert | no |
   | **1.11.0** | **active + hardcoded on-heap** | **yes, persistent** |
   
   Happy to provide additional event-log metrics or a minimal reproducible 
Spark job if useful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to