wombatu-kun opened a new pull request, #16548:
URL: https://github.com/apache/iceberg/pull/16548

   Closes #16546.
   
   ## Summary
   
   `TestMonitorSource.testStateRestore` (the `testStateRestore(File, 
ClusterClient)` variant in the Flink v2.0/v2.1 trees) intermittently fails with 
a `TimeoutException` from `CollectingSink.poll` ([example CI 
run](https://github.com/apache/iceberg/actions/runs/26314549114/job/77471777704)).
 The timeout only means the sink queue stayed empty for 5s; the real cause is a 
savepoint-completion race in the shared test helper, not slow startup.
   
   `OperatorTestBase.closeJobClient(JobClient, File)` discarded the 
`CompletableFuture<String>` returned by `stopWithSavepoint` and instead waited 
for the savepoint *directory* to appear on disk. That directory is created 
early in the savepoint process, before the `_metadata`/state files finish 
writing. Phase 2 of the test then restores from that path via 
`clusterClient.submitJob`; when the restore races savepoint completion, the 
restored job never comes up and emits nothing, so the poll times out.
   
   ## What changed
   
   - `OperatorTestBase.closeJobClient` now awaits the `stopWithSavepoint(...)` 
future and returns the path it resolves to, so the savepoint is guaranteed to 
be fully written before any job restores from it. This mirrors the existing 
idiom in `TestIcebergSourceFailover.testBoundedWithSavepoint`, which awaits the 
savepoint future with `.get()`. The only caller that passes a non-null 
savepoint directory is `TestMonitorSource.testStateRestore`, so the change is 
scoped to this test.
   - As a backstop for restored-job startup latency on busy CI, the Phase 2 
poll in `testStateRestore` is raised from 5s to 30s. The assertion stays strict 
— a genuine re-read emits a non-empty event quickly and still fails fast — so 
the longer timeout only extends the wait for the (correct) first event, 
mirroring the "deterministic fix + generous timeout backstop" pattern used when 
`TestIcebergSourceFailover` was de-flaked.
   
   Both changes are applied identically to the Flink v2.0 and v2.1 trees. The 
v1.20 variant uses `env.executeAsync` rather than `clusterClient.submitJob` and 
is not affected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to