shangxinli opened a new issue, #18783:
URL: https://github.com/apache/hudi/issues/18783

   ### Bug Description
   
   **What happened:**
   
   A failed Hudi write can leave behind orphan parquet files — either 
truncated/incomplete files or complete-but-uncommitted files belonging to an 
aborted instant. Hudi's Rollback action is responsible for removing them, using 
either the marker-based strategy (`.hoodie/.temp/<instant>/`) or the 
listing-based strategy (matching files against the failed instant's commit time 
embedded in the filename).
   
   If Rollback itself partially fails — crash mid-rollback, marker loss, or a 
blocked storage write that lands **after** rollback already completed (for 
example, a slow object-store `close()` call that returns long after the writer 
task was cancelled by the engine) — some orphan files persist on disk.
   
   While the failed instant and its rollback record remain in the active 
timeline, the orphans are still recoverable: a retried rollback or a 
reconciliation pass can match the orphan's filename against the failed instant 
and delete it.
   
   Once Archive moves the rollback record out of the active timeline (and 
eventually past archived-timeline retention), the system loses every metadata 
anchor that would let any process identify those files as orphans. Critically, 
orphans from failed writes typically have a `fileId` with **no surviving 
committed sibling slice**, so the reader's "latest slice within a file group" 
logic treats the orphan as a legitimate one-slice file group.
   
   End-user impact:
   - **Incomplete files** → readers fail with corrupt/invalid parquet errors
   - **Complete-but-uncommitted files** → readers return duplicate records
   
   This violates the reader/writer isolation guarantee Hudi provides via the 
timeline.
   
   **What you expected:**
   
   Files created by writes that ultimately did not commit should never become 
reader-visible, regardless of how long the table runs or whether Archive has 
pruned the rollback record. The reader/writer isolation guarantee should hold 
across the entire lifecycle of the table, not only while the failed instant 
happens to still be in the active timeline.
   
   **Steps to reproduce:**
   
   1. Configure any Hudi write path (Flink or Spark; both reproduce). Use 
`hoodie.clean.failed.writes.policy=LAZY` (typical for multi-writer or Flink 
ingestion).
   2. Induce a partial-write failure where the data file lands but the instant 
never commits. One reliable producer in production: a blocked storage `close()` 
that returns after the writer task has been cancelled by the engine. Any 
storage layer with intermittent congestion or blocking-close semantics under 
load (object stores, certain HDFS configurations, etc.) makes this easy to 
observe.
   3. Let auto-rollback fire (via the next writer detecting an expired 
heartbeat). Verify some orphan files remain — for example, because the blocked 
`close()` had not yet landed when rollback ran its listing-based scan.
   4. Let `hoodie.keep.min.commits` / `hoodie.keep.max.commits` archive the 
failed instant and its rollback record.
   5. Query the table from any reader path (Spark, Flink, Presto/Trino, Hive). 
Depending on whether the orphan is truncated or complete, the query either 
fails with parquet read errors or returns duplicate records.
   
   ### Environment
   
   **Hudi version:** Reproducible on master; the structural issue applies to 
all current versions.
   **Query engine:** Spark, Flink, Trino, Hive (any engine that lists partition 
files via Hudi's `FileSystemView`).
   **Relevant configs:**
   - `hoodie.client.heartbeat.interval_in_ms=60000` (default)
   - `hoodie.client.heartbeat.tolerable.misses=2` (default — short rollback 
trigger window amplifies the chance of a `close()` landing after rollback 
completes)
   - `hoodie.clean.failed.writes.policy=LAZY`
   - Default archive retention (`hoodie.keep.min.commits` / 
`hoodie.keep.max.commits`)
   
   ### Logs and Stack Trace
   
   Representative log sequence on the failing writer's executor:
   
   ```
   WARN  o.a.h.client.heartbeat.HoodieHeartbeatClient - Heartbeat expired, ...
   INFO  o.a.h.client.BaseHoodieTableServiceClient - Begin rollback of instant 
<T_failed>
   INFO  o.a.h.metrics.HoodieMetrics - Sending rollback metrics 
(duration=<...>, numFilesDeleted=<N>)
   INFO  o.a.h.client.heartbeat.HeartbeatUtils - Deleted the heartbeat for 
instant <T_failed>
   ```
   
   Then, **after** the rollback has completed, the blocked storage `close()` 
returns and a new parquet file appears at 
`<partition>/<fileId>_<writeToken>_<T_failed>.parquet`. There is no entry in 
any timeline file referencing this file's existence.
   
   Downstream reader log (Spark example):
   
   ```
   java.lang.RuntimeException: Failed to read Parquet file ... is not a Parquet 
file (length is too small)
   ```
   
   (For complete-but-uncommitted files, the symptom is silent duplicate records 
returned by the query — no log line at all.)
   
   ---
   
   ### Proposal
   
   Add an **archive precondition** that prevents a rollback instant from being 
archived until its orphans are provably gone.
   
   Concretely, at the archive planner:
   
   1. For each rollback instant about to be archived, parse its 
`HoodieRollbackMetadata` to obtain:
      - The target instant being rolled back
      - The partitions it touched
      - The list of files it claimed to delete (`successDeleteFiles` ∪ 
`failedDeleteFiles`)
   2. List the partitions named in the rollback metadata and find any files 
whose embedded instant matches the target instant (regex: 
`*_<targetInstant>.parquet` for base files; 
`.<fileId>_<writeToken>.log.<version>_<targetInstant>` for MoR log files).
   3. Cross-check against the rollback's claimed-deleted list. Any file still 
present that wasn't in the claimed-deleted list (or was in `failedDeleteFiles`) 
is a candidate orphan.
   4. Confirm via timeline lookup that the file's instant is not a `COMPLETED` 
commit anywhere in `(active ∪ archived)` timeline.
   5. If candidate orphans remain, **defer archival** of this rollback instant. 
Two reasonable actions:
      - Re-invoke the rollback (which can still find and delete the orphan now 
using markers or listing), or
      - Surface a metric/alert for operator intervention.
   
   This is a per-cycle boundary check — it does not require scanning the entire 
history every time. The induction property is: as long as no rollback instant 
is ever archived without proving its orphans gone, no rollback orphan can ever 
cross into the post-archive "invisible" state.
   
   **Complementary measure:** extend marker retention. Today markers under 
`.hoodie/.temp/<instant>/` are deleted as the last step of rollback, which 
means by the time a post-rollback orphan appears (for example, from a blocked 
`close()` landing late), the markers are already gone. Retaining markers until 
the rollback instant itself is archived would give the precondition check an 
authoritative file list to verify against, rather than relying on partition 
listings.
   
   **One-time repair:** a CLI or admin tool that runs the same scan against 
archived rollback instants (before they age past archived-timeline retention) 
to clean up any orphans created by pre-existing partial rollbacks before this 
fix lands.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to