gudladona commented on PR #18241:
URL: https://github.com/apache/hudi/pull/18241#issuecomment-3946657582

   A quick perspective of how the double buffer works in this PR.
   
   # HoodieParquetFileBinaryCopier — Double-Buffer Prefetch Pipeline
   
   ```
   
╔══════════════════════════════════════════════════════════════════════════════════╗
   ║          HoodieParquetFileBinaryCopier — Double-Buffer Prefetch Pipeline   
     ║
   
╚══════════════════════════════════════════════════════════════════════════════════╝
   
     inputFiles (Queue)         Main Thread                  
hudi-binary-copy-prefetch
     ┌──────────────┐                                               (daemon)
     │  file-1.pq   │
     │  file-2.pq   │
     │  file-3.pq   │
     │  file-4.pq   │
     └──────────────┘
   
     ── binaryCopy() called 
──────────────────────────────────────────────────────────
   
     triggerPrefetch()         poll file-1 ──────────────────► FSDataInputStream
                               nextFileToPrefetch = file-1           │
                               nextBuffer ──────────────────────────►│ 
readFully()
                                                                      ▼
                                                               [ nextBuffer ]
                                                               ░░░░░░░░░░░░░  ← 
filling
   
     initNextReader()          join() ◄─────────────────────────────────────── 
done
     ┌─────────────────────────────────────────────────────────────────────┐
     │  SWAP BUFFERS:                                                      │
     │    currentBuffer  ←  result.buffer  (was nextBuffer, now full)      │
     │    nextBuffer     ←  old currentBuffer  (now free)                  │
     └─────────────────────────────────────────────────────────────────────┘
                               triggerPrefetch() ──────────────────► 
FSDataInputStream
                               nextFileToPrefetch = file-2                │
                               nextBuffer (free) ───────────────────►│ 
readFully()
                                                                      ▼
     reader = ByteArrayInputFile(currentBuffer)              [ nextBuffer ]
                                                             ░░░░░░░░░░░░░  ← 
filling
            │
            │  PROCESS file-1 (CPU-bound)               I/O overlaps with CPU  ↑
            ▼
     ┌──────────────────────────┐
     │  for each row group:     │
     │    processBlocksFromReader│
     │      ┌─────────────────┐ │
     │      │ read row-group  │ │   ← single blockRead() into 
reusableBlockBuffer
     │      │ into memory     │ │     (avoids per-column S3 seeks)
     │      └────────┬────────┘ │
     │               │          │
     │      for each column:    │
     │        seek(colOffset)   │   ← ByteArraySeekableInputStream.seek()
     │        appendColumnChunk │   ← zero-copy from buffer to output
     │                          │
     └──────────────────────────┘
            │
            ▼
     initNextReader()          join() ◄─────────────────────────────────────── 
done
     ┌─────────────────────────────────────────────────────────────────────┐
     │  SWAP BUFFERS:                                                      │
     │    currentBuffer  ←  result.buffer  (file-2, now full)              │
     │    nextBuffer     ←  old currentBuffer  (file-1 data, now free)     │
     └─────────────────────────────────────────────────────────────────────┘
                               triggerPrefetch() ──────────────────► 
FSDataInputStream
                               nextFileToPrefetch = file-3                │
                               nextBuffer (file-1 slot) ────────────►│ 
readFully()
                                                                      ▼
     reader = ByteArrayInputFile(currentBuffer)              [ nextBuffer ]
                                                             ░░░░░░░░░░░░░  ← 
filling
            │
            │  PROCESS file-2 (CPU-bound)
            ▼
           ...  (pattern repeats)
   
   
     ── File too large (> 2 GB) fallback 
─────────────────────────────────────────────
   
     prefetch returns null ──► reader = HadoopInputFile (stream directly from 
S3)
                                   │
                                   ▼  processBlocksFromReader()
                               read entire row-group span with blockRead()
                               into reusableBlockBuffer  (single S3 GET per row 
group)
                               then seek per-column within that buffer
   
   
     ── close() 
──────────────────────────────────────────────────────────────────────
   
       prefetchExecutor.shutdownNow()   ← interrupts any in-flight prefetch
       currentBuffer = null             ← release for GC
       nextBuffer    = null             ← release for GC
       super.close() → writer.end()     ← finalise Parquet footer
   
   
     ── Memory layout at steady state 
────────────────────────────────────────────────
   
       currentBuffer  [ file-N data ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ]
                           ▲ main thread reads (via ByteArrayInputFile / 
ByteArraySeekableInputStream)
   
       nextBuffer     [ file-N+1 data ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ]
                           ▲ prefetch thread writes 
(FSDataInputStream.readFully)
   
       reusableBlockBuffer [ current row-group span ░░░░░░░░░░░░░░░░░ ]
                           ▲ main thread only — one row-group at a time from 
currentBuffer
   ```
   
   > The two buffers are **never touched by the same party at the same time** — 
the swap
   > only happens on the main thread after `join()` confirms the background 
write is
   > complete, so there is no data race despite no explicit locking.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to