metesynnada opened a new issue, #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278

   **Describe the bug**
   I am working with 
   
   ```sql
   === Physical plan ===
   CoalescePartitionsExec
     RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
       UnboundableExec: unbounded=true
   ```
   
   plan. Using the physical plan with **`CoalescePartitionsExec`** and 
**`RepartitionExec`** causes strange behavior when providing a stream with only 
one unique value.
   
   The strange behavior: 
   
   - The issue with the **`CoalescePartitionsExec`** and **`RepartitionExec`** 
physical plan is that when a stream with only one unique value is provided, no 
data is read from the **`RepartitionExec`** until the stream is exhausted. 
Calling **`wake_receivers()`** does not wake up the **`DistributionReceiver`** 
. This behavior is not observed without **`CoalescePartitionsExec`**.  I 
suspect that the problem is caused by the spawning of new threads inside 
**`CoalescePartitionsExec`** and the waker needs to be updated.
   https://rust-lang.github.io/async-book/02_execution/03_wakeups.html
   
   - Plans with blocking repartition:
       
       ```
       === Physical plan ===
       CoalescePartitionsExec
         ProjectionExec: expr=[a2@0 as a5]
           RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 
}], 3), input_partitions=1
             UnboundableExec: unbounded=false
       ```
       
       ```
       === Physical plan ===
       CoalescePartitionsExec
         RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 
3), input_partitions=1
           UnboundableExec: unbounded=false
       ```
       
   - Plan without blocking (`plan.execute(2, task)`), and this can change 
according to hash value.) :
       
       ```
       === Physical plan ===
       RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 
3), input_partitions=1
         UnboundableExec: unbounded=false
       ```
       
   
   **To Reproduce**
   
   - Create a file `datafusion/core/tests/repartition_exec_blocks.rs`
   - Put this code
   
   ```rust
   use arrow::array::UInt32Array;
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use arrow::record_batch::RecordBatch;
   use arrow::util::pretty::print_batches;
   use datafusion::execution::context::TaskContext;
   use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
   use datafusion::physical_plan::projection::ProjectionExec;
   use datafusion::physical_plan::repartition::RepartitionExec;
   use datafusion::physical_plan::{
       displayable, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
       SendableRecordBatchStream,
   };
   use datafusion::prelude::{SessionConfig, SessionContext};
   use datafusion_common::from_slice::FromSlice;
   use datafusion_common::{Result, Statistics};
   use datafusion_physical_expr::expressions::{col, Column};
   use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
   use futures::{Stream, StreamExt};
   use std::any::Any;
   use std::pin::Pin;
   use std::sync::Arc;
   use std::task::{Context, Poll};
   
   /// A mock execution plan that simply returns the provided data source 
characteristic
   #[derive(Debug, Clone)]
   pub struct MyUnboundedExec {
       batch_produce: Option<usize>,
       schema: Arc<Schema>,
       /// Ref-counting helper to check if the plan and the produced stream are 
still in memory.
       refs: Arc<()>,
   }
   impl MyUnboundedExec {
       pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
           Self {
               batch_produce,
               schema: Arc::new(schema),
               refs: Default::default(),
           }
       }
   }
   impl ExecutionPlan for MyUnboundedExec {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           Arc::clone(&self.schema)
       }
   
       fn output_partitioning(&self) -> Partitioning {
           Partitioning::UnknownPartitioning(1)
       }
   
       fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
           None
       }
   
       fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
           vec![]
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _: Vec<Arc<dyn ExecutionPlan>>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(self)
       }
   
       fn execute(
           &self,
           _partition: usize,
           _context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           Ok(Box::pin(UnboundedStream {
               batch_produce: self.batch_produce,
               count: 0,
               schema: Arc::clone(&self.schema),
               _refs: Arc::clone(&self.refs),
           }))
       }
   
       fn fmt_as(
           &self,
           t: DisplayFormatType,
           f: &mut std::fmt::Formatter,
       ) -> std::fmt::Result {
           match t {
               DisplayFormatType::Default => {
                   write!(
                       f,
                       "UnboundableExec: unbounded={}",
                       self.batch_produce.is_none(),
                   )
               }
           }
       }
   
       fn statistics(&self) -> Statistics {
           Statistics::default()
       }
   }
   
   #[derive(Debug)]
   pub struct UnboundedStream {
       batch_produce: Option<usize>,
       count: usize,
       /// Schema mocked by this stream.
       schema: SchemaRef,
   
       /// Ref-counting helper to check if the stream are still in memory.
       _refs: Arc<()>,
   }
   
   impl Stream for UnboundedStream {
       type Item = Result<RecordBatch>;
   
       fn poll_next(
           mut self: Pin<&mut Self>,
           _cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
           if let Some(val) = self.batch_produce {
               if val <= self.count {
                   println!("Stream Finished");
                   return Poll::Ready(None);
               }
           }
           let batch = RecordBatch::try_new(
               self.schema.clone(),
               vec![Arc::new(UInt32Array::from_slice([1]))],
           )?;
           self.count += 1;
           std::thread::sleep(std::time::Duration::from_millis(100));
           Poll::Ready(Some(Ok(batch)))
       }
   }
   
   impl RecordBatchStream for UnboundedStream {
       fn schema(&self) -> SchemaRef {
           Arc::clone(&self.schema)
       }
   }
   
   #[tokio::test(flavor = "multi_thread")]
   async fn unbounded_repartition_sa() -> Result<()> {
       let config = SessionConfig::new();
       let ctx = SessionContext::with_config(config);
       let task = ctx.task_ctx();
       let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, 
false)]);
       let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); // 
If you put None, it will be a unbounded source.
       let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 
0))];
       let plan = Arc::new(RepartitionExec::try_new(input, 
Partitioning::Hash(on, 3))?);
       let plan = Arc::new(ProjectionExec::try_new(
           vec![(col("a2", &left_schema)?, "a5".to_string())],
           plan.clone(),
       )?);
       let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
       println!(
           "=== Physical plan ===\n{}\n",
           displayable(plan.as_ref()).indent()
       );
       let mut stream = plan.execute(0, task)?;
       while let Some(result) = stream.next().await {
           print_batches(&[result?.clone()])?;
       }
       Ok(())
   }
   ```
   
   **Expected behavior**
   
   ```rust
   //! There are `N` virtual MPSC (multi-producer, single consumer) channels 
with unbounded capacity. However, if all
   //! buffers/channels are non-empty, than a global gate will be closed 
preventing new data from being written (the
   //! sender futures will be [pending](Poll::Pending)) until at least one 
channel is empty (and not closed).
   ```
   
   - Since it does not block the senders, I would expect that the waker on the 
receiver should wake up after `wake_receivers()` call.
   
   **Additional context**
   Add any other context about the problem here.
   
   cc @crepererum @alamb @tustvold 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to