alamb commented on code in PR #8840:
URL: https://github.com/apache/arrow-datafusion/pull/8840#discussion_r1468094224
##########
datafusion/core/src/datasource/cte_worktable.rs:
##########
@@ -84,7 +82,11 @@ impl TableProvider for CteWorkTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- not_impl_err!("scan not implemented for CteWorkTable yet")
+ // TODO: pushdown filters and limits
Review Comment:
I recommend tracking this separate tickets or tasks on the main recursive
CTE ticket, otherwise things like this can be easily forgotten
##########
datafusion/physical-plan/src/work_table.rs:
##########
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the work table query plan
+
+use std::any::Any;
+use std::sync::{Arc, Mutex};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::Partitioning;
+
+use crate::memory::MemoryStream;
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+use super::expressions::PhysicalSortExpr;
+
+use super::{
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ SendableRecordBatchStream, Statistics,
+};
+use datafusion_common::{DataFusionError, Result};
+
+/// The name is from PostgreSQL's terminology.
+/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
+/// This table serves as a mirror or buffer between each iteration of a
recursive query.
+#[derive(Debug)]
+pub(super) struct WorkTable {
+ batches: Mutex<Option<Vec<RecordBatch>>>,
+}
+
+impl WorkTable {
+ /// Create a new work table.
+ pub(super) fn new() -> Self {
+ Self {
+ batches: Mutex::new(None),
+ }
+ }
+
+ /// Take the previously written batches from the work table.
+ /// This will be called by the [`WorkTableExec`] when it is executed.
+ fn take(&self) -> Vec<RecordBatch> {
+ let batches = self.batches.lock().unwrap().take().unwrap_or_default();
+ batches
+ }
+
+ /// Write the results of a recursive query iteration to the work table.
+ pub(super) fn write(&self, input: Vec<RecordBatch>) {
+ self.batches.lock().unwrap().replace(input);
+ }
+}
+
+/// A temporary "working table" operation where the input data will be
+/// taken from the named handle during the execution and will be re-published
+/// as is (kind of like a mirror).
+///
+/// Most notably used in the implementation of recursive queries where the
+/// underlying relation does not exist yet but the data will come as the
previous
+/// term is evaluated. This table will be used such that the recursive plan
+/// will register a receiver in the task context and this plan will use that
+/// receiver to get the data and stream it back up so that the batches are
available
+/// in the next iteration.
+#[derive(Clone, Debug)]
+pub struct WorkTableExec {
+ /// Name of the relation handler
+ name: String,
+ /// The schema of the stream
+ schema: SchemaRef,
+ /// The work table
+ work_table: Arc<WorkTable>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl WorkTableExec {
+ /// Create a new execution plan for a worktable exec.
+ pub fn new(name: String, schema: SchemaRef) -> Self {
+ Self {
+ name,
+ schema,
+ metrics: ExecutionPlanMetricsSet::new(),
+ work_table: Arc::new(WorkTable::new()),
+ }
+ }
+
+ pub(super) fn with_work_table(&self, work_table: Arc<WorkTable>) -> Self {
+ Self {
+ name: self.name.clone(),
+ schema: self.schema.clone(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ work_table,
+ }
+ }
+}
+
+impl DisplayAs for WorkTableExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "WorkTableExec: name={}", self.name)
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for WorkTableExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(self.clone())
+ }
+
+ /// Stream the batches that were written to the work table.
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ // WorkTable streams must be the plan base.
+ if partition != 0 {
+ return Err(DataFusionError::Internal(format!(
+ "WorkTableExec got an invalid partition {} (expected 0)",
+ partition
+ )));
Review Comment:
You can also use the `internal_err!` macro here too if you want
```suggestion
return internal_err!("WorkTableExec got an invalid partition
{partition} (expected 0)");
```
##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the recursive query plan
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use super::expressions::PhysicalSortExpr;
+use super::metrics::BaselineMetrics;
+use super::RecordBatchStream;
+use super::{
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ work_table::{WorkTable, WorkTableExec},
+ SendableRecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::Partitioning;
+use futures::{ready, Stream, StreamExt};
+
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+/// Recursive query execution plan.
+///
+/// This plan has two components: a base part (the static term) and
+/// a dynamic part (the recursive term). The execution will start from
+/// the base, and as long as the previous iteration produced at least
+/// a single new row (taking care of the distinction) the recursive
+/// part will be continuously executed.
+///
+/// Before each execution of the dynamic part, the rows from the previous
+/// iteration will be available in a "working table" (not a real table,
+/// can be only accessed using a continuance operation).
+///
+/// Note that there won't be any limit or checks applied to detect
+/// an infinite recursion, so it is up to the planner to ensure that
+/// it won't happen.
+#[derive(Debug)]
+pub struct RecursiveQueryExec {
+ /// Name of the query handler
+ name: String,
+ /// The working table of cte
+ work_table: Arc<WorkTable>,
+ /// The base part (static term)
+ static_term: Arc<dyn ExecutionPlan>,
+ /// The dynamic part (recursive term)
+ recursive_term: Arc<dyn ExecutionPlan>,
+ /// Distinction
+ is_distinct: bool,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl RecursiveQueryExec {
+ /// Create a new RecursiveQueryExec
+ pub fn try_new(
+ name: String,
+ static_term: Arc<dyn ExecutionPlan>,
+ recursive_term: Arc<dyn ExecutionPlan>,
+ is_distinct: bool,
+ ) -> Result<Self> {
+ // Each recursive query needs its own work table
+ let work_table = Arc::new(WorkTable::new());
+ // Use the same work table for both the WorkTableExec and the
recursive term
+ let recursive_term = assign_work_table(recursive_term,
work_table.clone())?;
+ Ok(RecursiveQueryExec {
+ name,
+ static_term,
+ recursive_term,
+ is_distinct,
+ work_table,
+ metrics: ExecutionPlanMetricsSet::new(),
+ })
+ }
+}
+
+impl ExecutionPlan for RecursiveQueryExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.static_term.schema()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.static_term.clone(), self.recursive_term.clone()]
+ }
+
+ // Distribution on a recursive query is really tricky to handle.
+ // For now, we are going to use a single partition but in the
+ // future we might find a better way to handle this.
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ // TODO: control these hints and see whether we can
+ // infer some from the child plans (static/recurisve terms).
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false, false]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false, false]
+ }
+
+ fn required_input_distribution(&self) ->
Vec<datafusion_physical_expr::Distribution> {
+ vec![
+ datafusion_physical_expr::Distribution::SinglePartition,
+ datafusion_physical_expr::Distribution::SinglePartition,
+ ]
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(RecursiveQueryExec {
+ name: self.name.clone(),
+ static_term: children[0].clone(),
+ recursive_term: children[1].clone(),
+ is_distinct: self.is_distinct,
+ work_table: self.work_table.clone(),
+ metrics: self.metrics.clone(),
+ }))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ // TODO: we might be able to handle multiple partitions in the future.
+ if partition != 0 {
+ return Err(DataFusionError::Internal(format!(
+ "RecursiveQueryExec got an invalid partition {} (expected 0)",
+ partition
+ )));
+ }
+
+ let static_stream = self.static_term.execute(partition,
context.clone())?;
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ Ok(Box::pin(RecursiveQueryStream::new(
+ context,
+ self.work_table.clone(),
+ self.recursive_term.clone(),
+ static_stream,
+ baseline_metrics,
+ )))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(Statistics::new_unknown(&self.schema()))
+ }
+}
+
+impl DisplayAs for RecursiveQueryExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "RecursiveQueryExec: is_distinct={}",
self.is_distinct)
Review Comment:
maybe is is worth printing `self.name` here as well
##########
datafusion/sqllogictest/test_files/cte.slt:
##########
@@ -38,3 +38,616 @@ Projection: NUMBERS.a, NUMBERS.b, NUMBERS.c
physical_plan
ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
--PlaceholderRowExec
+
+
+
+# enable recursive CTEs
+statement ok
+set datafusion.execution.enable_recursive_ctes = true;
+
+# trivial recursive CTE works
+query I rowsort
+WITH RECURSIVE nodes AS (
+ SELECT 1 as id
+ UNION ALL
+ SELECT id + 1 as id
+ FROM nodes
+ WHERE id < 10
+)
+SELECT * FROM nodes
+----
+1
+10
+2
+3
+4
+5
+6
+7
+8
+9
+
+# explain trivial recursive CTE
+query TT
+EXPLAIN WITH RECURSIVE nodes AS (
+ SELECT 1 as id
+ UNION ALL
+ SELECT id + 1 as id
+ FROM nodes
+ WHERE id < 10
+)
+SELECT * FROM nodes
+----
+logical_plan
+Projection: nodes.id
+--SubqueryAlias: nodes
+----RecursiveQuery: is_distinct=false
+------Projection: Int64(1) AS id
+--------EmptyRelation
+------Projection: nodes.id + Int64(1) AS id
+--------Filter: nodes.id < Int64(10)
+----------TableScan: nodes
+physical_plan
+RecursiveQueryExec: is_distinct=false
+--ProjectionExec: expr=[1 as id]
+----PlaceholderRowExec
+--CoalescePartitionsExec
+----ProjectionExec: expr=[id@0 + 1 as id]
+------CoalesceBatchesExec: target_batch_size=8192
+--------FilterExec: id@0 < 10
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------WorkTableExec: name=nodes
+
+# setup
+statement ok
+CREATE EXTERNAL TABLE balance STORED as CSV WITH HEADER ROW LOCATION
'../../testing/data/csv/r_cte_balance.csv'
Review Comment:
BTW if you wanted to avoid having to check in external files, you could use
`CREATE TABLE AS VALUES`
For example:
https://github.com/apache/arrow-datafusion/blob/fc752557204f4b52ab4cb38b5caff99b1b73b902/datafusion/sqllogictest/test_files/aggregate.slt#L41-L45
##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the recursive query plan
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use super::expressions::PhysicalSortExpr;
+use super::metrics::BaselineMetrics;
+use super::RecordBatchStream;
+use super::{
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ work_table::{WorkTable, WorkTableExec},
+ SendableRecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::Partitioning;
+use futures::{ready, Stream, StreamExt};
+
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+/// Recursive query execution plan.
+///
+/// This plan has two components: a base part (the static term) and
+/// a dynamic part (the recursive term). The execution will start from
+/// the base, and as long as the previous iteration produced at least
+/// a single new row (taking care of the distinction) the recursive
+/// part will be continuously executed.
+///
+/// Before each execution of the dynamic part, the rows from the previous
+/// iteration will be available in a "working table" (not a real table,
+/// can be only accessed using a continuance operation).
+///
+/// Note that there won't be any limit or checks applied to detect
+/// an infinite recursion, so it is up to the planner to ensure that
+/// it won't happen.
+#[derive(Debug)]
+pub struct RecursiveQueryExec {
+ /// Name of the query handler
+ name: String,
+ /// The working table of cte
+ work_table: Arc<WorkTable>,
+ /// The base part (static term)
+ static_term: Arc<dyn ExecutionPlan>,
+ /// The dynamic part (recursive term)
+ recursive_term: Arc<dyn ExecutionPlan>,
+ /// Distinction
+ is_distinct: bool,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl RecursiveQueryExec {
+ /// Create a new RecursiveQueryExec
+ pub fn try_new(
+ name: String,
+ static_term: Arc<dyn ExecutionPlan>,
+ recursive_term: Arc<dyn ExecutionPlan>,
+ is_distinct: bool,
+ ) -> Result<Self> {
+ // Each recursive query needs its own work table
+ let work_table = Arc::new(WorkTable::new());
+ // Use the same work table for both the WorkTableExec and the
recursive term
+ let recursive_term = assign_work_table(recursive_term,
work_table.clone())?;
+ Ok(RecursiveQueryExec {
+ name,
+ static_term,
+ recursive_term,
+ is_distinct,
+ work_table,
+ metrics: ExecutionPlanMetricsSet::new(),
+ })
+ }
+}
+
+impl ExecutionPlan for RecursiveQueryExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.static_term.schema()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.static_term.clone(), self.recursive_term.clone()]
+ }
+
+ // Distribution on a recursive query is really tricky to handle.
+ // For now, we are going to use a single partition but in the
+ // future we might find a better way to handle this.
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ // TODO: control these hints and see whether we can
Review Comment:
yeah, this would be pretty tricky -- I think the choices you have below are
the simple (and correct) ones, and excellent choices for the first
implementation 👍
##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the recursive query plan
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use super::expressions::PhysicalSortExpr;
+use super::metrics::BaselineMetrics;
+use super::RecordBatchStream;
+use super::{
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ work_table::{WorkTable, WorkTableExec},
+ SendableRecordBatchStream, Statistics,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::Partitioning;
+use futures::{ready, Stream, StreamExt};
+
+use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+/// Recursive query execution plan.
+///
+/// This plan has two components: a base part (the static term) and
+/// a dynamic part (the recursive term). The execution will start from
+/// the base, and as long as the previous iteration produced at least
+/// a single new row (taking care of the distinction) the recursive
+/// part will be continuously executed.
+///
+/// Before each execution of the dynamic part, the rows from the previous
+/// iteration will be available in a "working table" (not a real table,
+/// can be only accessed using a continuance operation).
+///
+/// Note that there won't be any limit or checks applied to detect
+/// an infinite recursion, so it is up to the planner to ensure that
+/// it won't happen.
+#[derive(Debug)]
+pub struct RecursiveQueryExec {
+ /// Name of the query handler
+ name: String,
+ /// The working table of cte
+ work_table: Arc<WorkTable>,
+ /// The base part (static term)
+ static_term: Arc<dyn ExecutionPlan>,
+ /// The dynamic part (recursive term)
+ recursive_term: Arc<dyn ExecutionPlan>,
+ /// Distinction
+ is_distinct: bool,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl RecursiveQueryExec {
+ /// Create a new RecursiveQueryExec
+ pub fn try_new(
+ name: String,
+ static_term: Arc<dyn ExecutionPlan>,
+ recursive_term: Arc<dyn ExecutionPlan>,
+ is_distinct: bool,
+ ) -> Result<Self> {
+ // Each recursive query needs its own work table
+ let work_table = Arc::new(WorkTable::new());
+ // Use the same work table for both the WorkTableExec and the
recursive term
+ let recursive_term = assign_work_table(recursive_term,
work_table.clone())?;
+ Ok(RecursiveQueryExec {
+ name,
+ static_term,
+ recursive_term,
+ is_distinct,
+ work_table,
+ metrics: ExecutionPlanMetricsSet::new(),
+ })
+ }
+}
+
+impl ExecutionPlan for RecursiveQueryExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.static_term.schema()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.static_term.clone(), self.recursive_term.clone()]
+ }
+
+ // Distribution on a recursive query is really tricky to handle.
+ // For now, we are going to use a single partition but in the
+ // future we might find a better way to handle this.
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ // TODO: control these hints and see whether we can
+ // infer some from the child plans (static/recurisve terms).
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false, false]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false, false]
+ }
+
+ fn required_input_distribution(&self) ->
Vec<datafusion_physical_expr::Distribution> {
+ vec![
+ datafusion_physical_expr::Distribution::SinglePartition,
+ datafusion_physical_expr::Distribution::SinglePartition,
+ ]
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(RecursiveQueryExec {
+ name: self.name.clone(),
+ static_term: children[0].clone(),
+ recursive_term: children[1].clone(),
+ is_distinct: self.is_distinct,
+ work_table: self.work_table.clone(),
+ metrics: self.metrics.clone(),
+ }))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ // TODO: we might be able to handle multiple partitions in the future.
+ if partition != 0 {
+ return Err(DataFusionError::Internal(format!(
+ "RecursiveQueryExec got an invalid partition {} (expected 0)",
+ partition
+ )));
+ }
+
+ let static_stream = self.static_term.execute(partition,
context.clone())?;
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ Ok(Box::pin(RecursiveQueryStream::new(
+ context,
+ self.work_table.clone(),
+ self.recursive_term.clone(),
+ static_stream,
+ baseline_metrics,
+ )))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(Statistics::new_unknown(&self.schema()))
+ }
+}
+
+impl DisplayAs for RecursiveQueryExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "RecursiveQueryExec: is_distinct={}",
self.is_distinct)
+ }
+ }
+ }
+}
+
+/// The actual logic of the recursive queries happens during the streaming
+/// process. A simplified version of the algorithm is the following:
+///
+/// buffer = []
+///
+/// while batch := static_stream.next():
+/// buffer.push(batch)
+/// yield buffer
+///
+/// while buffer.len() > 0:
+/// sender, receiver = Channel()
+/// register_continuation(handle_name, receiver)
+/// sender.send(buffer.drain())
+/// recursive_stream = recursive_term.execute()
+/// while batch := recursive_stream.next():
+/// buffer.append(batch)
+/// yield buffer
+///
+struct RecursiveQueryStream {
+ /// The context to be used for managing handlers & executing new tasks
+ task_context: Arc<TaskContext>,
+ /// The working table state, representing the self referencing cte table
+ work_table: Arc<WorkTable>,
+ /// The dynamic part (recursive term) as is (without being executed)
+ recursive_term: Arc<dyn ExecutionPlan>,
+ /// The static part (static term) as a stream. If the processing of this
+ /// part is completed, then it will be None.
+ static_stream: Option<SendableRecordBatchStream>,
+ /// The dynamic part (recursive term) as a stream. If the processing of
this
+ /// part has not started yet, or has been completed, then it will be None.
+ recursive_stream: Option<SendableRecordBatchStream>,
+ /// The schema of the output.
+ schema: SchemaRef,
+ /// In-memory buffer for storing a copy of the current results. Will be
+ /// cleared after each iteration.
+ buffer: Vec<RecordBatch>,
+ // /// Metrics.
+ _baseline_metrics: BaselineMetrics,
+}
+
+impl RecursiveQueryStream {
+ /// Create a new recursive query stream
+ fn new(
+ task_context: Arc<TaskContext>,
+ work_table: Arc<WorkTable>,
+ recursive_term: Arc<dyn ExecutionPlan>,
+ static_stream: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ ) -> Self {
+ let schema = static_stream.schema();
+ Self {
+ task_context,
+ work_table,
+ recursive_term,
+ static_stream: Some(static_stream),
+ recursive_stream: None,
+ schema,
+ buffer: vec![],
+ _baseline_metrics: baseline_metrics,
+ }
+ }
+
+ /// Push a clone of the given batch to the in memory buffer, and then
return
+ /// a poll with it.
+ fn push_batch(
+ mut self: std::pin::Pin<&mut Self>,
+ batch: RecordBatch,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ self.buffer.push(batch.clone());
+ Poll::Ready(Some(Ok(batch)))
+ }
+
+ /// Start polling for the next iteration, will be called either after the
static term
+ /// is completed or another term is completed. It will follow the
algorithm above on
+ /// to check whether the recursion has ended.
+ fn poll_next_iteration(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ let total_length = self
+ .buffer
+ .iter()
+ .fold(0, |acc, batch| acc + batch.num_rows());
+
+ if total_length == 0 {
+ return Poll::Ready(None);
+ }
+
+ // Update the work table with the current buffer
+ let batches = self.buffer.drain(..).collect();
+ self.work_table.write(batches);
+
+ // We always execute (and re-execute iteratively) the first partition.
+ // Downstream plans should not expect any partitioning.
+ let partition = 0;
+
+ self.recursive_stream = Some(
+ self.recursive_term
+ .execute(partition, self.task_context.clone())?,
+ );
+ self.poll_next(cx)
+ }
+}
+
+fn assign_work_table(
+ plan: Arc<dyn ExecutionPlan>,
+ work_table: Arc<WorkTable>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let mut work_table_refs = 0;
+ plan.transform_down_mut(&mut |plan| {
+ if let Some(exec) = plan.as_any().downcast_ref::<WorkTableExec>() {
+ if work_table_refs > 0 {
+ not_impl_err!(
+ "Multiple recursive references to the same CTE are not
supported"
+ )
+ } else {
+ work_table_refs += 1;
+ Ok(Transformed::Yes(Arc::new(
+ exec.with_work_table(work_table.clone()),
+ )))
+ }
+ } else if plan.as_any().is::<RecursiveQueryExec>() {
+ not_impl_err!("Recursive queries cannot be nested")
+ } else {
+ Ok(Transformed::No(plan))
+ }
+ })
+}
+
+impl Stream for RecursiveQueryStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ // TODO: we should use this poll to record some metrics!
+ if let Some(static_stream) = &mut self.static_stream {
Review Comment:
this is very nicely written and easy to follow 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]