nbalajee opened a new issue, #18395:
URL: https://github.com/apache/hudi/issues/18395
### Feature Description
### Feature Description
**What the feature achieves:**
Introduces an intermediate file staging mechanism that prevents zombie/stray
Spark executors from leaving duplicate data files visible to query engines.
When enabled, data files are written to a staging area (the marker directory)
with a distinguishing prefix, and only promoted to the query-visible data
partition during finalizeWrite. Files left behind by zombie executors are
automatically cleaned up when the marker directory is deleted in postCommit.
**Why this feature is needed:**
In Hudi's Spark commit protocol, when a stage is retried, executors from the
failed attempt (zombies) may continue running and finalize their data files
after finalizeWrite has already completed -- or even after the commit is
written. Because Hudi's file naming encodes taskAttemptId in the write token,
these zombie-written files have unique names and are not overwritten by the
successful attempt. They become visible to query engines, causing duplicate
data.
This is particularly problematic on cloud object stores like GCS where:
There is no "deletes win" semantic (unlike HDFS NameNode)
Rename is not atomic without Hierarchical Namespace (HNS) enabled
Eventually-consistent listings can miss cleanup operations
The current reconciliation in finalizeWrite only deletes files not
referenced in WriteStatus, but a zombie that closes its file after
reconciliation bypasses this check entirely.
**Scope of impact:**
Affects all Hudi write operations (insert, upsert, bulk_insert) on Spark
Most severe on cloud object stores (GCS, S3) but also possible on HDFS under
heavy contention
Can cause silent data duplication that is difficult to detect and repair
### User Experience
### User Experience
**How users will use this feature:**
**Configuration:** Enable via a single config property:
`hoodie.write.intermediate.file.enabled = true`
Default is false for safe, controlled rollout.
**No API changes:** Transparent to all existing write paths (insert, upsert,
bulk_insert, compaction). WriteStatus always reports the final data partition
path regardless of whether staging is enabled.
**No query-side changes:** Query engines (Spark, Presto, Trino, Hive)
continue reading from data partitions as before. Intermediate files in the
marker directory are invisible to queries.
**Cleanup is automatic:** Zombie intermediate files are cleaned up by the
existing quietDeleteMarkerDir() call in postCommit. No additional cleanup
configuration or operations are needed.
**Rollback safe:** If the process crashes before commit, the marker
directory (containing intermediate files) is cleaned up by Hudi's existing
rollback mechanism.
**Usage example:**
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withIntermediateFileWriteEnabled(true)
.build();
### Implementation Summary
**Write phase:**
Data files are created in .hoodie/.temp/<instantTime>/<partitionPath>/ with
__hudi_intermediate_ prefix
The data partition directory is NOT created eagerly during write.
**Finalize phase (finalizeWrite):**
Files referenced in WriteStatus are renamed from the marker directory to the
data partition
Target directory is created via mkdirs before rename
Renames are parallelized using HoodieEngineContext
**Cleanup:**
postCommit calls quietDeleteMarkerDir() which recursively deletes the marker
directory, cleaning up any orphaned intermediate files from zombies
No additional sweep logic needed in clean or rollback
**Rename cost:**
HDFS: metadata-only operation (sub-millisecond)
GCS with HNS: metadata-only operation
GCS without HNS: copy + delete (bounded by file size, but acceptable for
correctness)
### Hudi RFC Requirements
### RFC Requirements
RFC PR link: N/A
**Why RFC is/isn't needed:**
**Does this change public interfaces/APIs?** No -- adds a new opt-in config
property only
**Does this change storage format?** No -- final data files have identical
names and locations
**Justification:** This is a targeted correctness fix behind a feature flag.
It reuses existing infrastructure (marker directory, finalizeWrite, postCommit
cleanup) and does not alter the commit protocol, storage layout, or public
APIs. An RFC is not required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]