This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b806117bfa Fix flaky SpillPool channel test by synchronizing reader 
and writer tasks (#19110)
b806117bfa is described below

commit b806117bfa80a4c80afbcbfcc024abf1c77feba5
Author: kosiew <[email protected]>
AuthorDate: Wed Dec 17 11:30:59 2025 +0800

    Fix flaky SpillPool channel test by synchronizing reader and writer tasks 
(#19110)
    
    ## Which issue does this PR close?
    
    * Closes #19058.
    
    ## Rationale for this change
    
    The `spill_pool` channel test `test_reader_catches_up_to_writer` was
    flaky due to non-deterministic coordination between the reader and
    writer tasks. The test used time-based sleeps and polling on shared
    state to infer when the reader had started and when it had processed a
    batch. Under varying scheduler timing, this could cause the reader to
    miss events or observe them in a different order, leading to
    intermittent failures where the recorded event sequence did not match
    expectations (for example, observing `3` instead of `5` reads).
    
    Since this test verifies the correctness and wakeup behavior of the
    spill channel used by the spill pool, flakiness here undermines
    confidence in the spill mechanism and can cause spurious CI failures.
    
    This PR makes the test coordination explicit and deterministic using
    `oneshot` channels, and also improves the usage example for the spill
    channel to show how to run writer and reader concurrently in a robust
    way.
    
    ## What changes are included in this PR?
    
    1. **Example: concurrent writer and reader usage**
    
       * Update the `spill_pool::channel` usage example to:
    
    * Spawn writer and reader tasks concurrently instead of only spawning
    the writer.
    * Use `writer.push_batch(&batch)?` so the example returns a `Result` and
    propagates errors correctly.
    * Explicitly `drop(writer)` at the end of the writer task to finalize
    the spill file and wake the reader.
    * Use `tokio::join!` to await both tasks and map join errors into
    `DataFusionError::Execution`.
    * Assert that the reader sees all expected batches (`batches_read ==
    5`).
    * The updated example better demonstrates the intended concurrent usage
    pattern of the spill channel and ensures the reader is correctly woken
    when the writer finishes.
    
    2. **Test: make `test_reader_catches_up_to_writer` deterministic**
    
       * Introduce two `oneshot` channels:
    
    * `reader_waiting_tx/rx` to signal when the reader has started and is
    pending on its first `next()` call.
    * `first_read_done_tx/rx` to signal when the reader has completed
    processing the first batch.
       * In the reader task:
    
    * Record `ReadStart` and send on `reader_waiting_tx` before awaiting
    `reader.next()`.
    * After successfully reading and recording the first batch, send on
    `first_read_done_tx`.
         * Then read the second batch as before.
       * In the test body:
    
    * Wait on `reader_waiting_rx` instead of sleeping for a fixed duration,
    ensuring the reader is actually pending before writing the first batch.
    * After the first write, wait on `first_read_done_rx` before issuing the
    second write.
       * This establishes a precise and documented sequence of events:
    
         1. Reader starts and pends on the first `next()`.
         2. First write occurs, waking the reader.
         3. Reader processes the first batch and signals completion.
         4. Second write occurs.
    * With this explicit synchronization, the event ordering in the test is
    stable and no longer depends on scheduler timing or arbitrary sleeps,
    eliminating the flakiness.
    
    ## Are these changes tested?
    
    Yes.
    
    ```
    for i in {1..200}; do
      echo "Run #$i started"
      cargo test -p datafusion-physical-plan --profile ci  --doc -q || break
      echo "Run #$i completed"
    done
    ```
    
    * The modified test `test_reader_catches_up_to_writer` continues to run
    as part of the existing `spill_pool` test suite, but now uses explicit
    synchronization instead of timing-based assumptions.
    * The test has been exercised repeatedly to confirm that:
    
      * The expected read/write event sequence is stable across runs.
    * The intermittent assertion failures (e.g., mismatched read counts such
    as `3` vs `5`) no longer occur.
    * The updated example code compiles and type-checks by returning
    `datafusion_common::Result` from both spawned tasks and from the
    combined `tokio::join!` result.
    
    ## Are there any user-facing changes?
    
    There are no behavior changes to the public API or spill pool semantics.
    
    * The spill channel and spill pool behavior remains the same at runtime.
    * Only the documentation/example and the internal test harness have been
    updated.
    * No configuration flags or public methods were added, removed, or
    changed, so there are no breaking changes or documentation requirements
    beyond what is already updated inline.
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and tested.
---
 datafusion/physical-plan/src/spill/spill_pool.rs | 67 ++++++++++++++----------
 1 file changed, 40 insertions(+), 27 deletions(-)

diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs 
b/datafusion/physical-plan/src/spill/spill_pool.rs
index bbe54ca45c..e3b547b573 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -384,28 +384,33 @@ impl Drop for SpillPoolWriter {
 /// // Create channel with 1MB file size limit
 /// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
 ///
-/// // Spawn writer task to produce batches
-/// let write_handle = tokio::spawn(async move {
+/// // Spawn writer and reader concurrently; writer wakes reader via wakers
+/// let writer_task = tokio::spawn(async move {
 ///     for i in 0..5 {
 ///         let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
 ///         let batch = RecordBatch::try_new(schema.clone(), 
vec![array]).unwrap();
-///         writer.push_batch(&batch).unwrap();
+///         writer.push_batch(&batch)?;
 ///     }
-///     // Writer dropped here, finalizing current file
+///     // Explicitly drop writer to finalize the spill file and wake the 
reader
+///     drop(writer);
+///     datafusion_common::Result::<()>::Ok(())
 /// });
 ///
-/// // Reader consumes batches in FIFO order (can run concurrently with writer)
-/// let mut batches_read = 0;
-/// while let Some(result) = reader.next().await {
-///     let batch = result?;
-///     batches_read += 1;
-///     // Process batch...
-///     if batches_read == 5 {
-///         break; // Got all expected batches
+/// let reader_task = tokio::spawn(async move {
+///     let mut batches_read = 0;
+///     while let Some(result) = reader.next().await {
+///         let _batch = result?;
+///         batches_read += 1;
 ///     }
-/// }
+///     datafusion_common::Result::<usize>::Ok(batches_read)
+/// });
+///
+/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
+/// writer_res
+///     .map_err(|e| 
datafusion_common::DataFusionError::Execution(e.to_string()))??;
+/// let batches_read = reader_res
+///     .map_err(|e| 
datafusion_common::DataFusionError::Execution(e.to_string()))??;
 ///
-/// write_handle.await.unwrap();
 /// assert_eq!(batches_read, 5);
 /// # Ok(())
 /// # }
@@ -1173,6 +1178,9 @@ mod tests {
     async fn test_reader_catches_up_to_writer() -> Result<()> {
         let (writer, mut reader) = create_spill_channel(1024 * 1024);
 
+        let (reader_waiting_tx, reader_waiting_rx) = 
tokio::sync::oneshot::channel();
+        let (first_read_done_tx, first_read_done_rx) = 
tokio::sync::oneshot::channel();
+
         #[derive(Clone, Copy, Debug, PartialEq, Eq)]
         enum ReadWriteEvent {
             ReadStart,
@@ -1185,36 +1193,41 @@ mod tests {
         let reader_events = Arc::clone(&events);
         let reader_handle = SpawnedTask::spawn(async move {
             reader_events.lock().push(ReadWriteEvent::ReadStart);
+            reader_waiting_tx
+                .send(())
+                .expect("reader_waiting channel closed unexpectedly");
             let result = reader.next().await.unwrap().unwrap();
             reader_events
                 .lock()
                 .push(ReadWriteEvent::Read(result.num_rows()));
+            first_read_done_tx
+                .send(())
+                .expect("first_read_done channel closed unexpectedly");
             let result = reader.next().await.unwrap().unwrap();
             reader_events
                 .lock()
                 .push(ReadWriteEvent::Read(result.num_rows()));
         });
 
-        // Give reader time to start pending
-        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
+        // Wait until the reader is pending on the first batch
+        reader_waiting_rx
+            .await
+            .expect("reader should signal when waiting");
 
         // Now write a batch (should wake the reader)
         let batch = create_test_batch(0, 5);
         events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
         writer.push_batch(&batch)?;
 
-        // Wait for the reader to process
-        let processed = async {
-            loop {
-                if events.lock().len() >= 3 {
-                    break;
-                }
-                
tokio::time::sleep(std::time::Duration::from_micros(500)).await;
-            }
-        };
-        tokio::time::timeout(std::time::Duration::from_secs(1), processed)
+        // Wait for the reader to finish the first read before allowing the
+        // second write. This ensures deterministic ordering of events:
+        // 1. The reader starts and pends on the first `next()`
+        // 2. The first write wakes the reader
+        // 3. The reader processes the first batch and signals completion
+        // 4. The second write is issued, ensuring consistent event ordering
+        first_read_done_rx
             .await
-            .unwrap();
+            .expect("reader should signal when first read completes");
 
         // Write another batch
         let batch = create_test_batch(5, 10);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to