This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ded897ee7 fix: add yield point to `RepartitionExec` (#5299)
ded897ee7 is described below

commit ded897ee7f4944ab5ef266854e000179650b5f07
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Feb 17 14:54:14 2023 +0100

    fix: add yield point to `RepartitionExec` (#5299)
    
    * fix: add yield point to `RepartitionExec`
    
    This prevents endless spinning and locked up tokio tasks if the inputs
    never yield `pending`.
    
    Fixes #5278.
    
    * refactor: use a single `UnboundedExec` for testing
    
    * refactor: rename test
---
 .../core/src/physical_optimizer/pipeline_fixer.rs  |  23 +++-
 .../core/src/physical_plan/repartition/mod.rs      |   4 +
 datafusion/core/src/test/exec.rs                   |  70 -----------
 datafusion/core/src/test_util.rs                   | 132 ++++++++++++++++++++-
 datafusion/core/tests/repartition.rs               |  60 ++++++++++
 5 files changed, 211 insertions(+), 78 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs 
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index ab74ad37e..1ca21bb88 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -195,9 +195,11 @@ mod hash_join_tests {
     use crate::physical_optimizer::join_selection::swap_join_type;
     use crate::physical_optimizer::test_utils::SourceType;
     use crate::physical_plan::expressions::Column;
+    use crate::physical_plan::joins::PartitionMode;
     use crate::physical_plan::projection::ProjectionExec;
-    use crate::{physical_plan::joins::PartitionMode, 
test::exec::UnboundedExec};
+    use crate::test_util::UnboundedExec;
     use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::RecordBatch;
     use std::sync::Arc;
 
     struct TestCase {
@@ -529,17 +531,28 @@ mod hash_join_tests {
         }
         Ok(())
     }
+
     #[allow(clippy::vtable_address_comparisons)]
     async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> 
Result<()> {
         let left_unbounded = t.initial_sources_unbounded.0 == 
SourceType::Unbounded;
         let right_unbounded = t.initial_sources_unbounded.1 == 
SourceType::Unbounded;
         let left_exec = Arc::new(UnboundedExec::new(
-            left_unbounded,
-            Schema::new(vec![Field::new("a", DataType::Int32, false)]),
+            (!left_unbounded).then_some(1),
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+                "a",
+                DataType::Int32,
+                false,
+            )]))),
+            2,
         )) as Arc<dyn ExecutionPlan>;
         let right_exec = Arc::new(UnboundedExec::new(
-            right_unbounded,
-            Schema::new(vec![Field::new("b", DataType::Int32, false)]),
+            (!right_unbounded).then_some(1),
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+                "b",
+                DataType::Int32,
+                false,
+            )]))),
+            2,
         )) as Arc<dyn ExecutionPlan>;
 
         let join = HashJoinExec::try_new(
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 072bdfa3e..8a7ce3ce2 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -532,6 +532,10 @@ impl RepartitionExec {
                 }
                 timer.done();
             }
+
+            // If the input stream is endless, we may spin forever and never 
yield back to tokio. Hence let us yield.
+            // See https://github.com/apache/arrow-datafusion/issues/5278.
+            tokio::task::yield_now().await;
         }
 
         Ok(())
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index b19173546..bce7d08a5 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -507,76 +507,6 @@ impl ExecutionPlan for StatisticsExec {
     }
 }
 
-/// A mock execution plan that simply returns the provided data source 
characteristic
-#[derive(Debug, Clone)]
-pub struct UnboundedExec {
-    unbounded: bool,
-    schema: Arc<Schema>,
-}
-impl UnboundedExec {
-    pub fn new(unbounded: bool, schema: Schema) -> Self {
-        Self {
-            unbounded,
-            schema: Arc::new(schema),
-        }
-    }
-}
-impl ExecutionPlan for UnboundedExec {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(2)
-    }
-
-    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
-        Ok(self.unbounded)
-    }
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        _: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(self)
-    }
-
-    fn execute(
-        &self,
-        _partition: usize,
-        _context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        unimplemented!("This plan only serves for testing statistics")
-    }
-
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(f, "UnboundableExec: unbounded={}", self.unbounded,)
-            }
-        }
-    }
-
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
-    }
-}
-
 /// Execution plan that emits streams that block forever.
 ///
 /// This is useful to test shutdown / cancelation behavior of certain 
execution plans.
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 669a17845..66059d713 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -19,17 +19,26 @@
 
 use std::any::Any;
 use std::collections::HashMap;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 use std::{env, error::Error, path::PathBuf, sync::Arc};
 
 use crate::datasource::datasource::TableProviderFactory;
 use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
-use crate::execution::context::SessionState;
+use crate::error::Result;
+use crate::execution::context::{SessionState, TaskContext};
 use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_common::DataFusionError;
+use datafusion_common::{DataFusionError, Statistics};
 use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::Stream;
 
 /// Compares formatted output of a record batch with an expected
 /// vector of strings, with the result of pretty formatting record
@@ -332,6 +341,123 @@ impl TableProvider for TestTableProvider {
     }
 }
 
+/// A mock execution plan that simply returns the provided data source 
characteristic
+#[derive(Debug, Clone)]
+pub struct UnboundedExec {
+    batch_produce: Option<usize>,
+    batch: RecordBatch,
+    partitions: usize,
+}
+impl UnboundedExec {
+    /// Create new exec that clones the given record batch to its output.
+    ///
+    /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per 
partition.
+    pub fn new(
+        batch_produce: Option<usize>,
+        batch: RecordBatch,
+        partitions: usize,
+    ) -> Self {
+        Self {
+            batch_produce,
+            batch,
+            partitions,
+        }
+    }
+}
+impl ExecutionPlan for UnboundedExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.batch.schema()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.partitions)
+    }
+
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(self.batch_produce.is_none())
+    }
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(UnboundedStream {
+            batch_produce: self.batch_produce,
+            count: 0,
+            batch: self.batch.clone(),
+        }))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "UnboundableExec: unbounded={}",
+                    self.batch_produce.is_none(),
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+#[derive(Debug)]
+struct UnboundedStream {
+    batch_produce: Option<usize>,
+    count: usize,
+    batch: RecordBatch,
+}
+
+impl Stream for UnboundedStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if let Some(val) = self.batch_produce {
+            if val <= self.count {
+                return Poll::Ready(None);
+            }
+        }
+        self.count += 1;
+        Poll::Ready(Some(Ok(self.batch.clone())))
+    }
+}
+
+impl RecordBatchStream for UnboundedStream {
+    fn schema(&self) -> SchemaRef {
+        self.batch.schema()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/core/tests/repartition.rs 
b/datafusion/core/tests/repartition.rs
new file mode 100644
index 000000000..4fd3a9125
--- /dev/null
+++ b/datafusion/core/tests/repartition.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::UInt32Array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::test_util::UnboundedExec;
+use datafusion_common::from_slice::FromSlice;
+use datafusion_common::Result;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::PhysicalExpr;
+use futures::StreamExt;
+use std::sync::Arc;
+
+/// See <https://github.com/apache/arrow-datafusion/issues/5278>
+#[tokio::test]
+async fn unbounded_repartition() -> Result<()> {
+    let config = SessionConfig::new();
+    let ctx = SessionContext::with_config(config);
+    let task = ctx.task_ctx();
+    let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, 
false)]));
+    let batch = RecordBatch::try_new(
+        Arc::clone(&schema),
+        vec![Arc::new(UInt32Array::from_slice([1]))],
+    )?;
+    let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1));
+    let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
+    let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 
3))?);
+    let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
+    let mut stream = plan.execute(0, task)?;
+
+    // Note: `tokio::time::timeout` does NOT help here because in the 
mentioned issue, the whole runtime is blocked by a
+    // CPU-spinning thread. Using a multithread runtime with multiple threads 
is NOT a solution since this would not
+    // trigger the bug (the bug is not specific to a single-thread RT though, 
it's just the only way to trigger it reliably).
+    let batch_actual = stream
+        .next()
+        .await
+        .expect("not terminated")
+        .expect("no error in stream");
+    assert_eq!(batch_actual, batch);
+    Ok(())
+}

Reply via email to