nbalajee opened a new issue, #18395:
URL: https://github.com/apache/hudi/issues/18395

   ### Feature Description
   
   ### Feature Description
   **What the feature achieves:**
   
   Introduces an intermediate file staging mechanism that prevents zombie/stray 
Spark executors from leaving duplicate data files visible to query engines. 
When enabled, data files are written to a staging area (the marker directory) 
with a distinguishing prefix, and only promoted to the query-visible data 
partition during finalizeWrite. Files left behind by zombie executors are 
automatically cleaned up when the marker directory is deleted in postCommit.
   
   **Why this feature is needed:**
   
   In Hudi's Spark commit protocol, when a stage is retried, executors from the 
failed attempt (zombies) may continue running and finalize their data files 
after finalizeWrite has already completed -- or even after the commit is 
written. Because Hudi's file naming encodes taskAttemptId in the write token, 
these zombie-written files have unique names and are not overwritten by the 
successful attempt. They become visible to query engines, causing duplicate 
data.
   
   This is particularly problematic on cloud object stores like GCS where:
   
   There is no "deletes win" semantic (unlike HDFS NameNode)
   Rename is not atomic without Hierarchical Namespace (HNS) enabled
   Eventually-consistent listings can miss cleanup operations
   The current reconciliation in finalizeWrite only deletes files not 
referenced in WriteStatus, but a zombie that closes its file after 
reconciliation bypasses this check entirely.
   
   **Scope of impact:**
   
   Affects all Hudi write operations (insert, upsert, bulk_insert) on Spark
   Most severe on cloud object stores (GCS, S3) but also possible on HDFS under 
heavy contention
   Can cause silent data duplication that is difficult to detect and repair
   
   ### User Experience
   
   ### User Experience
   **How users will use this feature:**
   
   **Configuration:** Enable via a single config property:
   
   `hoodie.write.intermediate.file.enabled = true`
   Default is false for safe, controlled rollout.
   
   **No API changes:** Transparent to all existing write paths (insert, upsert, 
bulk_insert, compaction). WriteStatus always reports the final data partition 
path regardless of whether staging is enabled.
   
   **No query-side changes:** Query engines (Spark, Presto, Trino, Hive) 
continue reading from data partitions as before. Intermediate files in the 
marker directory are invisible to queries.
   
   **Cleanup is automatic:** Zombie intermediate files are cleaned up by the 
existing quietDeleteMarkerDir() call in postCommit. No additional cleanup 
configuration or operations are needed.
   
   **Rollback safe:** If the process crashes before commit, the marker 
directory (containing intermediate files) is cleaned up by Hudi's existing 
rollback mechanism.
   
   **Usage example:**
   
   HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
       .withPath(basePath)
       .withIntermediateFileWriteEnabled(true)
       .build();
   
   ### Implementation Summary
   **Write phase:**
   Data files are created in .hoodie/.temp/<instantTime>/<partitionPath>/ with 
__hudi_intermediate_ prefix
   The data partition directory is NOT created eagerly during write.
   
   **Finalize phase (finalizeWrite):**
   Files referenced in WriteStatus are renamed from the marker directory to the 
data partition
   Target directory is created via mkdirs before rename
   Renames are parallelized using HoodieEngineContext
   
   **Cleanup:**
   postCommit calls quietDeleteMarkerDir() which recursively deletes the marker 
directory, cleaning up any orphaned intermediate files from zombies
   No additional sweep logic needed in clean or rollback
   
   **Rename cost:**
   HDFS: metadata-only operation (sub-millisecond)
   GCS with HNS: metadata-only operation
   GCS without HNS: copy + delete (bounded by file size, but acceptable for 
correctness)
   
   
   ### Hudi RFC Requirements
   
   ### RFC Requirements
   RFC PR link: N/A
   
   **Why RFC is/isn't needed:**
   
   **Does this change public interfaces/APIs?** No -- adds a new opt-in config 
property only
   **Does this change storage format?** No -- final data files have identical 
names and locations
   **Justification:** This is a targeted correctness fix behind a feature flag. 
It reuses existing infrastructure (marker directory, finalizeWrite, postCommit 
cleanup) and does not alter the commit protocol, storage layout, or public 
APIs. An RFC is not required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to