This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 20bc365e29 refactor(7181): move streaming_merge() into separate mod 
from the merge mod. (#7799)
20bc365e29 is described below

commit 20bc365e2908ee7731bda7a82b0e29da2665b8bf
Author: wiedld <[email protected]>
AuthorDate: Fri Oct 13 02:24:49 2023 -0400

    refactor(7181): move streaming_merge() into separate mod from the merge 
mod. (#7799)
    
    Merge mod has the SortPreservingMergeStream, containing the loser tree. 
This SortPreservingMergeStream struct will be used repeatedly as part of the 
cascading merge; in turn, the cascading merge will be implemented for the 
streaming_merge() method.
---
 datafusion/physical-plan/src/sorts/merge.rs        | 73 ++---------------
 datafusion/physical-plan/src/sorts/mod.rs          |  5 +-
 datafusion/physical-plan/src/sorts/sort.rs         |  2 +-
 .../physical-plan/src/sorts/streaming_merge.rs     | 92 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 71 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/merge.rs 
b/datafusion/physical-plan/src/sorts/merge.rs
index 67685509ab..e60baf2cd8 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -21,84 +21,21 @@
 use crate::metrics::BaselineMetrics;
 use crate::sorts::builder::BatchBuilder;
 use crate::sorts::cursor::Cursor;
-use crate::sorts::stream::{FieldCursorStream, PartitionedStream, 
RowCursorStream};
-use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream};
-use arrow::datatypes::{DataType, SchemaRef};
+use crate::sorts::stream::PartitionedStream;
+use crate::RecordBatchStream;
+use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
-use arrow_array::*;
 use datafusion_common::Result;
 use datafusion_execution::memory_pool::MemoryReservation;
 use futures::Stream;
 use std::pin::Pin;
 use std::task::{ready, Context, Poll};
 
-macro_rules! primitive_merge_helper {
-    ($t:ty, $($v:ident),+) => {
-        merge_helper!(PrimitiveArray<$t>, $($v),+)
-    };
-}
-
-macro_rules! merge_helper {
-    ($t:ty, $sort:ident, $streams:ident, $schema:ident, 
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) 
=> {{
-        let streams = FieldCursorStream::<$t>::new($sort, $streams);
-        return Ok(Box::pin(SortPreservingMergeStream::new(
-            Box::new(streams),
-            $schema,
-            $tracking_metrics,
-            $batch_size,
-            $fetch,
-            $reservation,
-        )));
-    }};
-}
-
-/// Perform a streaming merge of [`SendableRecordBatchStream`] based on 
provided sort expressions
-/// while preserving order.
-pub fn streaming_merge(
-    streams: Vec<SendableRecordBatchStream>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    metrics: BaselineMetrics,
-    batch_size: usize,
-    fetch: Option<usize>,
-    reservation: MemoryReservation,
-) -> Result<SendableRecordBatchStream> {
-    // Special case single column comparisons with optimized cursor 
implementations
-    if expressions.len() == 1 {
-        let sort = expressions[0].clone();
-        let data_type = sort.expr.data_type(schema.as_ref())?;
-        downcast_primitive! {
-            data_type => (primitive_merge_helper, sort, streams, schema, 
metrics, batch_size, fetch, reservation),
-            DataType::Utf8 => merge_helper!(StringArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
-            DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
-            DataType::Binary => merge_helper!(BinaryArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
-            DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
-            _ => {}
-        }
-    }
-
-    let streams = RowCursorStream::try_new(
-        schema.as_ref(),
-        expressions,
-        streams,
-        reservation.new_empty(),
-    )?;
-
-    Ok(Box::pin(SortPreservingMergeStream::new(
-        Box::new(streams),
-        schema,
-        metrics,
-        batch_size,
-        fetch,
-        reservation,
-    )))
-}
-
 /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
 type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, 
RecordBatch)>>>;
 
 #[derive(Debug)]
-struct SortPreservingMergeStream<C> {
+pub(crate) struct SortPreservingMergeStream<C> {
     in_progress: BatchBuilder,
 
     /// The sorted input streams to merge together
@@ -162,7 +99,7 @@ struct SortPreservingMergeStream<C> {
 }
 
 impl<C: Cursor> SortPreservingMergeStream<C> {
-    fn new(
+    pub(crate) fn new(
         streams: CursorStream<C>,
         schema: SchemaRef,
         metrics: BaselineMetrics,
diff --git a/datafusion/physical-plan/src/sorts/mod.rs 
b/datafusion/physical-plan/src/sorts/mod.rs
index dff39db423..8a1184d3c2 100644
--- a/datafusion/physical-plan/src/sorts/mod.rs
+++ b/datafusion/physical-plan/src/sorts/mod.rs
@@ -20,10 +20,11 @@
 mod builder;
 mod cursor;
 mod index;
-pub mod merge;
+mod merge;
 pub mod sort;
 pub mod sort_preserving_merge;
 mod stream;
+pub mod streaming_merge;
 
 pub use index::RowIndex;
-pub(crate) use merge::streaming_merge;
+pub(crate) use streaming_merge::streaming_merge;
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 703f80d90d..a56f8fec68 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -24,7 +24,7 @@ use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
-use crate::sorts::merge::streaming_merge;
+use crate::sorts::streaming_merge::streaming_merge;
 use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
 use crate::topk::TopK;
 use crate::{
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs 
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
new file mode 100644
index 0000000000..96d180027e
--- /dev/null
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Merge that deals with an arbitrary size of streaming inputs.
+//! This is an order-preserving merge.
+
+use crate::metrics::BaselineMetrics;
+use crate::sorts::{
+    merge::SortPreservingMergeStream,
+    stream::{FieldCursorStream, RowCursorStream},
+};
+use crate::{PhysicalSortExpr, SendableRecordBatchStream};
+use arrow::datatypes::{DataType, SchemaRef};
+use arrow_array::*;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+macro_rules! primitive_merge_helper {
+    ($t:ty, $($v:ident),+) => {
+        merge_helper!(PrimitiveArray<$t>, $($v),+)
+    };
+}
+
+macro_rules! merge_helper {
+    ($t:ty, $sort:ident, $streams:ident, $schema:ident, 
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) 
=> {{
+        let streams = FieldCursorStream::<$t>::new($sort, $streams);
+        return Ok(Box::pin(SortPreservingMergeStream::new(
+            Box::new(streams),
+            $schema,
+            $tracking_metrics,
+            $batch_size,
+            $fetch,
+            $reservation,
+        )));
+    }};
+}
+
+/// Perform a streaming merge of [`SendableRecordBatchStream`] based on 
provided sort expressions
+/// while preserving order.
+pub fn streaming_merge(
+    streams: Vec<SendableRecordBatchStream>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    fetch: Option<usize>,
+    reservation: MemoryReservation,
+) -> Result<SendableRecordBatchStream> {
+    // Special case single column comparisons with optimized cursor 
implementations
+    if expressions.len() == 1 {
+        let sort = expressions[0].clone();
+        let data_type = sort.expr.data_type(schema.as_ref())?;
+        downcast_primitive! {
+            data_type => (primitive_merge_helper, sort, streams, schema, 
metrics, batch_size, fetch, reservation),
+            DataType::Utf8 => merge_helper!(StringArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
+            DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
+            DataType::Binary => merge_helper!(BinaryArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
+            DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
+            _ => {}
+        }
+    }
+
+    let streams = RowCursorStream::try_new(
+        schema.as_ref(),
+        expressions,
+        streams,
+        reservation.new_empty(),
+    )?;
+
+    Ok(Box::pin(SortPreservingMergeStream::new(
+        Box::new(streams),
+        schema,
+        metrics,
+        batch_size,
+        fetch,
+        reservation,
+    )))
+}

Reply via email to