metesynnada opened a new issue, #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278
**Describe the bug**
I am working with
```sql
=== Physical plan ===
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
UnboundableExec: unbounded=true
```
plan. Using the physical plan with **`CoalescePartitionsExec`** and
**`RepartitionExec`** causes strange behavior when providing a stream with only
one unique value.
The strange behavior:
- The issue with the **`CoalescePartitionsExec`** and **`RepartitionExec`**
physical plan is that when a stream with only one unique value is provided, no
data is read from the **`RepartitionExec`** until the stream is exhausted.
Calling **`wake_receivers()`** does not wake up the **`DistributionReceiver`**
. This behavior is not observed without **`CoalescePartitionsExec`**. I
suspect that the problem is caused by the spawning of new threads inside
**`CoalescePartitionsExec`** and the waker needs to be updated.
https://rust-lang.github.io/async-book/02_execution/03_wakeups.html
- Plans with blocking repartition:
```
=== Physical plan ===
CoalescePartitionsExec
ProjectionExec: expr=[a2@0 as a5]
RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0
}], 3), input_partitions=1
UnboundableExec: unbounded=false
```
```
=== Physical plan ===
CoalescePartitionsExec
RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }],
3), input_partitions=1
UnboundableExec: unbounded=false
```
- Plan without blocking (`plan.execute(2, task)`), and this can change
according to hash value.) :
```
=== Physical plan ===
RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }],
3), input_partitions=1
UnboundableExec: unbounded=false
```
**To Reproduce**
- Create a file `datafusion/core/tests/repartition_exec_blocks.rs`
- Put this code
```rust
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{
displayable, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
/// A mock execution plan that simply returns the provided data source
characteristic
#[derive(Debug, Clone)]
pub struct MyUnboundedExec {
batch_produce: Option<usize>,
schema: Arc<Schema>,
/// Ref-counting helper to check if the plan and the produced stream are
still in memory.
refs: Arc<()>,
}
impl MyUnboundedExec {
pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
Self {
batch_produce,
schema: Arc::new(schema),
refs: Default::default(),
}
}
}
impl ExecutionPlan for MyUnboundedExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(UnboundedStream {
batch_produce: self.batch_produce,
count: 0,
schema: Arc::clone(&self.schema),
_refs: Arc::clone(&self.refs),
}))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"UnboundableExec: unbounded={}",
self.batch_produce.is_none(),
)
}
}
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
#[derive(Debug)]
pub struct UnboundedStream {
batch_produce: Option<usize>,
count: usize,
/// Schema mocked by this stream.
schema: SchemaRef,
/// Ref-counting helper to check if the stream are still in memory.
_refs: Arc<()>,
}
impl Stream for UnboundedStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(val) = self.batch_produce {
if val <= self.count {
println!("Stream Finished");
return Poll::Ready(None);
}
}
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![Arc::new(UInt32Array::from_slice([1]))],
)?;
self.count += 1;
std::thread::sleep(std::time::Duration::from_millis(100));
Poll::Ready(Some(Ok(batch)))
}
}
impl RecordBatchStream for UnboundedStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[tokio::test(flavor = "multi_thread")]
async fn unbounded_repartition_sa() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
let task = ctx.task_ctx();
let schema = Schema::new(vec![Field::new("a2", DataType::UInt32,
false)]);
let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); //
If you put None, it will be a unbounded source.
let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2",
0))];
let plan = Arc::new(RepartitionExec::try_new(input,
Partitioning::Hash(on, 3))?);
let plan = Arc::new(ProjectionExec::try_new(
vec![(col("a2", &left_schema)?, "a5".to_string())],
plan.clone(),
)?);
let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
println!(
"=== Physical plan ===\n{}\n",
displayable(plan.as_ref()).indent()
);
let mut stream = plan.execute(0, task)?;
while let Some(result) = stream.next().await {
print_batches(&[result?.clone()])?;
}
Ok(())
}
```
**Expected behavior**
```rust
//! There are `N` virtual MPSC (multi-producer, single consumer) channels
with unbounded capacity. However, if all
//! buffers/channels are non-empty, than a global gate will be closed
preventing new data from being written (the
//! sender futures will be [pending](Poll::Pending)) until at least one
channel is empty (and not closed).
```
- Since it does not block the senders, I would expect that the waker on the
receiver should wake up after `wake_receivers()` call.
**Additional context**
Add any other context about the problem here.
cc @crepererum @alamb @tustvold
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]