This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ded897ee7 fix: add yield point to `RepartitionExec` (#5299)
ded897ee7 is described below
commit ded897ee7f4944ab5ef266854e000179650b5f07
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Feb 17 14:54:14 2023 +0100
fix: add yield point to `RepartitionExec` (#5299)
* fix: add yield point to `RepartitionExec`
This prevents endless spinning and locked up tokio tasks if the inputs
never yield `pending`.
Fixes #5278.
* refactor: use a single `UnboundedExec` for testing
* refactor: rename test
---
.../core/src/physical_optimizer/pipeline_fixer.rs | 23 +++-
.../core/src/physical_plan/repartition/mod.rs | 4 +
datafusion/core/src/test/exec.rs | 70 -----------
datafusion/core/src/test_util.rs | 132 ++++++++++++++++++++-
datafusion/core/tests/repartition.rs | 60 ++++++++++
5 files changed, 211 insertions(+), 78 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index ab74ad37e..1ca21bb88 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -195,9 +195,11 @@ mod hash_join_tests {
use crate::physical_optimizer::join_selection::swap_join_type;
use crate::physical_optimizer::test_utils::SourceType;
use crate::physical_plan::expressions::Column;
+ use crate::physical_plan::joins::PartitionMode;
use crate::physical_plan::projection::ProjectionExec;
- use crate::{physical_plan::joins::PartitionMode,
test::exec::UnboundedExec};
+ use crate::test_util::UnboundedExec;
use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
use std::sync::Arc;
struct TestCase {
@@ -529,17 +531,28 @@ mod hash_join_tests {
}
Ok(())
}
+
#[allow(clippy::vtable_address_comparisons)]
async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) ->
Result<()> {
let left_unbounded = t.initial_sources_unbounded.0 ==
SourceType::Unbounded;
let right_unbounded = t.initial_sources_unbounded.1 ==
SourceType::Unbounded;
let left_exec = Arc::new(UnboundedExec::new(
- left_unbounded,
- Schema::new(vec![Field::new("a", DataType::Int32, false)]),
+ (!left_unbounded).then_some(1),
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Int32,
+ false,
+ )]))),
+ 2,
)) as Arc<dyn ExecutionPlan>;
let right_exec = Arc::new(UnboundedExec::new(
- right_unbounded,
- Schema::new(vec![Field::new("b", DataType::Int32, false)]),
+ (!right_unbounded).then_some(1),
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+ "b",
+ DataType::Int32,
+ false,
+ )]))),
+ 2,
)) as Arc<dyn ExecutionPlan>;
let join = HashJoinExec::try_new(
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 072bdfa3e..8a7ce3ce2 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -532,6 +532,10 @@ impl RepartitionExec {
}
timer.done();
}
+
+ // If the input stream is endless, we may spin forever and never
yield back to tokio. Hence let us yield.
+ // See https://github.com/apache/arrow-datafusion/issues/5278.
+ tokio::task::yield_now().await;
}
Ok(())
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index b19173546..bce7d08a5 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -507,76 +507,6 @@ impl ExecutionPlan for StatisticsExec {
}
}
-/// A mock execution plan that simply returns the provided data source
characteristic
-#[derive(Debug, Clone)]
-pub struct UnboundedExec {
- unbounded: bool,
- schema: Arc<Schema>,
-}
-impl UnboundedExec {
- pub fn new(unbounded: bool, schema: Schema) -> Self {
- Self {
- unbounded,
- schema: Arc::new(schema),
- }
- }
-}
-impl ExecutionPlan for UnboundedExec {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn schema(&self) -> SchemaRef {
- Arc::clone(&self.schema)
- }
-
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(2)
- }
-
- fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
- Ok(self.unbounded)
- }
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- _: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(self)
- }
-
- fn execute(
- &self,
- _partition: usize,
- _context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- unimplemented!("This plan only serves for testing statistics")
- }
-
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "UnboundableExec: unbounded={}", self.unbounded,)
- }
- }
- }
-
- fn statistics(&self) -> Statistics {
- Statistics::default()
- }
-}
-
/// Execution plan that emits streams that block forever.
///
/// This is useful to test shutdown / cancelation behavior of certain
execution plans.
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 669a17845..66059d713 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -19,17 +19,26 @@
use std::any::Any;
use std::collections::HashMap;
+use std::pin::Pin;
+use std::task::{Context, Poll};
use std::{env, error::Error, path::PathBuf, sync::Arc};
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
-use crate::execution::context::SessionState;
+use crate::error::Result;
+use crate::execution::context::{SessionState, TaskContext};
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
+};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion_common::DataFusionError;
+use datafusion_common::{DataFusionError, Statistics};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::Stream;
/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
@@ -332,6 +341,123 @@ impl TableProvider for TestTableProvider {
}
}
+/// A mock execution plan that simply returns the provided data source
characteristic
+#[derive(Debug, Clone)]
+pub struct UnboundedExec {
+ batch_produce: Option<usize>,
+ batch: RecordBatch,
+ partitions: usize,
+}
+impl UnboundedExec {
+ /// Create new exec that clones the given record batch to its output.
+ ///
+ /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per
partition.
+ pub fn new(
+ batch_produce: Option<usize>,
+ batch: RecordBatch,
+ partitions: usize,
+ ) -> Self {
+ Self {
+ batch_produce,
+ batch,
+ partitions,
+ }
+ }
+}
+impl ExecutionPlan for UnboundedExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.batch.schema()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.partitions)
+ }
+
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(self.batch_produce.is_none())
+ }
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ Ok(Box::pin(UnboundedStream {
+ batch_produce: self.batch_produce,
+ count: 0,
+ batch: self.batch.clone(),
+ }))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "UnboundableExec: unbounded={}",
+ self.batch_produce.is_none(),
+ )
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ Statistics::default()
+ }
+}
+
+#[derive(Debug)]
+struct UnboundedStream {
+ batch_produce: Option<usize>,
+ count: usize,
+ batch: RecordBatch,
+}
+
+impl Stream for UnboundedStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if let Some(val) = self.batch_produce {
+ if val <= self.count {
+ return Poll::Ready(None);
+ }
+ }
+ self.count += 1;
+ Poll::Ready(Some(Ok(self.batch.clone())))
+ }
+}
+
+impl RecordBatchStream for UnboundedStream {
+ fn schema(&self) -> SchemaRef {
+ self.batch.schema()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/core/tests/repartition.rs
b/datafusion/core/tests/repartition.rs
new file mode 100644
index 000000000..4fd3a9125
--- /dev/null
+++ b/datafusion/core/tests/repartition.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::UInt32Array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::test_util::UnboundedExec;
+use datafusion_common::from_slice::FromSlice;
+use datafusion_common::Result;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::PhysicalExpr;
+use futures::StreamExt;
+use std::sync::Arc;
+
+/// See <https://github.com/apache/arrow-datafusion/issues/5278>
+#[tokio::test]
+async fn unbounded_repartition() -> Result<()> {
+ let config = SessionConfig::new();
+ let ctx = SessionContext::with_config(config);
+ let task = ctx.task_ctx();
+ let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32,
false)]));
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(UInt32Array::from_slice([1]))],
+ )?;
+ let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1));
+ let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
+ let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on,
3))?);
+ let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
+ let mut stream = plan.execute(0, task)?;
+
+ // Note: `tokio::time::timeout` does NOT help here because in the
mentioned issue, the whole runtime is blocked by a
+ // CPU-spinning thread. Using a multithread runtime with multiple threads
is NOT a solution since this would not
+ // trigger the bug (the bug is not specific to a single-thread RT though,
it's just the only way to trigger it reliably).
+ let batch_actual = stream
+ .next()
+ .await
+ .expect("not terminated")
+ .expect("no error in stream");
+ assert_eq!(batch_actual, batch);
+ Ok(())
+}