gudladona commented on PR #18241:
URL: https://github.com/apache/hudi/pull/18241#issuecomment-3946657582
A quick perspective of how the double buffer works in this PR.
# HoodieParquetFileBinaryCopier — Double-Buffer Prefetch Pipeline
```
╔══════════════════════════════════════════════════════════════════════════════════╗
║ HoodieParquetFileBinaryCopier — Double-Buffer Prefetch Pipeline
║
╚══════════════════════════════════════════════════════════════════════════════════╝
inputFiles (Queue) Main Thread
hudi-binary-copy-prefetch
┌──────────────┐ (daemon)
│ file-1.pq │
│ file-2.pq │
│ file-3.pq │
│ file-4.pq │
└──────────────┘
── binaryCopy() called
──────────────────────────────────────────────────────────
triggerPrefetch() poll file-1 ──────────────────► FSDataInputStream
nextFileToPrefetch = file-1 │
nextBuffer ──────────────────────────►│
readFully()
▼
[ nextBuffer ]
░░░░░░░░░░░░░ ←
filling
initNextReader() join() ◄───────────────────────────────────────
done
┌─────────────────────────────────────────────────────────────────────┐
│ SWAP BUFFERS: │
│ currentBuffer ← result.buffer (was nextBuffer, now full) │
│ nextBuffer ← old currentBuffer (now free) │
└─────────────────────────────────────────────────────────────────────┘
triggerPrefetch() ──────────────────►
FSDataInputStream
nextFileToPrefetch = file-2 │
nextBuffer (free) ───────────────────►│
readFully()
▼
reader = ByteArrayInputFile(currentBuffer) [ nextBuffer ]
░░░░░░░░░░░░░ ←
filling
│
│ PROCESS file-1 (CPU-bound) I/O overlaps with CPU ↑
▼
┌──────────────────────────┐
│ for each row group: │
│ processBlocksFromReader│
│ ┌─────────────────┐ │
│ │ read row-group │ │ ← single blockRead() into
reusableBlockBuffer
│ │ into memory │ │ (avoids per-column S3 seeks)
│ └────────┬────────┘ │
│ │ │
│ for each column: │
│ seek(colOffset) │ ← ByteArraySeekableInputStream.seek()
│ appendColumnChunk │ ← zero-copy from buffer to output
│ │
└──────────────────────────┘
│
▼
initNextReader() join() ◄───────────────────────────────────────
done
┌─────────────────────────────────────────────────────────────────────┐
│ SWAP BUFFERS: │
│ currentBuffer ← result.buffer (file-2, now full) │
│ nextBuffer ← old currentBuffer (file-1 data, now free) │
└─────────────────────────────────────────────────────────────────────┘
triggerPrefetch() ──────────────────►
FSDataInputStream
nextFileToPrefetch = file-3 │
nextBuffer (file-1 slot) ────────────►│
readFully()
▼
reader = ByteArrayInputFile(currentBuffer) [ nextBuffer ]
░░░░░░░░░░░░░ ←
filling
│
│ PROCESS file-2 (CPU-bound)
▼
... (pattern repeats)
── File too large (> 2 GB) fallback
─────────────────────────────────────────────
prefetch returns null ──► reader = HadoopInputFile (stream directly from
S3)
│
▼ processBlocksFromReader()
read entire row-group span with blockRead()
into reusableBlockBuffer (single S3 GET per row
group)
then seek per-column within that buffer
── close()
──────────────────────────────────────────────────────────────────────
prefetchExecutor.shutdownNow() ← interrupts any in-flight prefetch
currentBuffer = null ← release for GC
nextBuffer = null ← release for GC
super.close() → writer.end() ← finalise Parquet footer
── Memory layout at steady state
────────────────────────────────────────────────
currentBuffer [ file-N data ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ]
▲ main thread reads (via ByteArrayInputFile /
ByteArraySeekableInputStream)
nextBuffer [ file-N+1 data ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ]
▲ prefetch thread writes
(FSDataInputStream.readFully)
reusableBlockBuffer [ current row-group span ░░░░░░░░░░░░░░░░░ ]
▲ main thread only — one row-group at a time from
currentBuffer
```
> The two buffers are **never touched by the same party at the same time** —
the swap
> only happens on the main thread after `join()` confirms the background
write is
> complete, so there is no data race despite no explicit locking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]