This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 20bc365e29 refactor(7181): move streaming_merge() into separate mod
from the merge mod. (#7799)
20bc365e29 is described below
commit 20bc365e2908ee7731bda7a82b0e29da2665b8bf
Author: wiedld <[email protected]>
AuthorDate: Fri Oct 13 02:24:49 2023 -0400
refactor(7181): move streaming_merge() into separate mod from the merge
mod. (#7799)
Merge mod has the SortPreservingMergeStream, containing the loser tree.
This SortPreservingMergeStream struct will be used repeatedly as part of the
cascading merge; in turn, the cascading merge will be implemented for the
streaming_merge() method.
---
datafusion/physical-plan/src/sorts/merge.rs | 73 ++---------------
datafusion/physical-plan/src/sorts/mod.rs | 5 +-
datafusion/physical-plan/src/sorts/sort.rs | 2 +-
.../physical-plan/src/sorts/streaming_merge.rs | 92 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 71 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/merge.rs
b/datafusion/physical-plan/src/sorts/merge.rs
index 67685509ab..e60baf2cd8 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -21,84 +21,21 @@
use crate::metrics::BaselineMetrics;
use crate::sorts::builder::BatchBuilder;
use crate::sorts::cursor::Cursor;
-use crate::sorts::stream::{FieldCursorStream, PartitionedStream,
RowCursorStream};
-use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream};
-use arrow::datatypes::{DataType, SchemaRef};
+use crate::sorts::stream::PartitionedStream;
+use crate::RecordBatchStream;
+use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
-use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
-macro_rules! primitive_merge_helper {
- ($t:ty, $($v:ident),+) => {
- merge_helper!(PrimitiveArray<$t>, $($v),+)
- };
-}
-
-macro_rules! merge_helper {
- ($t:ty, $sort:ident, $streams:ident, $schema:ident,
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident)
=> {{
- let streams = FieldCursorStream::<$t>::new($sort, $streams);
- return Ok(Box::pin(SortPreservingMergeStream::new(
- Box::new(streams),
- $schema,
- $tracking_metrics,
- $batch_size,
- $fetch,
- $reservation,
- )));
- }};
-}
-
-/// Perform a streaming merge of [`SendableRecordBatchStream`] based on
provided sort expressions
-/// while preserving order.
-pub fn streaming_merge(
- streams: Vec<SendableRecordBatchStream>,
- schema: SchemaRef,
- expressions: &[PhysicalSortExpr],
- metrics: BaselineMetrics,
- batch_size: usize,
- fetch: Option<usize>,
- reservation: MemoryReservation,
-) -> Result<SendableRecordBatchStream> {
- // Special case single column comparisons with optimized cursor
implementations
- if expressions.len() == 1 {
- let sort = expressions[0].clone();
- let data_type = sort.expr.data_type(schema.as_ref())?;
- downcast_primitive! {
- data_type => (primitive_merge_helper, sort, streams, schema,
metrics, batch_size, fetch, reservation),
- DataType::Utf8 => merge_helper!(StringArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
- DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
- DataType::Binary => merge_helper!(BinaryArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
- DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
- _ => {}
- }
- }
-
- let streams = RowCursorStream::try_new(
- schema.as_ref(),
- expressions,
- streams,
- reservation.new_empty(),
- )?;
-
- Ok(Box::pin(SortPreservingMergeStream::new(
- Box::new(streams),
- schema,
- metrics,
- batch_size,
- fetch,
- reservation,
- )))
-}
-
/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C,
RecordBatch)>>>;
#[derive(Debug)]
-struct SortPreservingMergeStream<C> {
+pub(crate) struct SortPreservingMergeStream<C> {
in_progress: BatchBuilder,
/// The sorted input streams to merge together
@@ -162,7 +99,7 @@ struct SortPreservingMergeStream<C> {
}
impl<C: Cursor> SortPreservingMergeStream<C> {
- fn new(
+ pub(crate) fn new(
streams: CursorStream<C>,
schema: SchemaRef,
metrics: BaselineMetrics,
diff --git a/datafusion/physical-plan/src/sorts/mod.rs
b/datafusion/physical-plan/src/sorts/mod.rs
index dff39db423..8a1184d3c2 100644
--- a/datafusion/physical-plan/src/sorts/mod.rs
+++ b/datafusion/physical-plan/src/sorts/mod.rs
@@ -20,10 +20,11 @@
mod builder;
mod cursor;
mod index;
-pub mod merge;
+mod merge;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
+pub mod streaming_merge;
pub use index::RowIndex;
-pub(crate) use merge::streaming_merge;
+pub(crate) use streaming_merge::streaming_merge;
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 703f80d90d..a56f8fec68 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -24,7 +24,7 @@ use crate::expressions::PhysicalSortExpr;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
-use crate::sorts::merge::streaming_merge;
+use crate::sorts::streaming_merge::streaming_merge;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::topk::TopK;
use crate::{
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
new file mode 100644
index 0000000000..96d180027e
--- /dev/null
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Merge that deals with an arbitrary size of streaming inputs.
+//! This is an order-preserving merge.
+
+use crate::metrics::BaselineMetrics;
+use crate::sorts::{
+ merge::SortPreservingMergeStream,
+ stream::{FieldCursorStream, RowCursorStream},
+};
+use crate::{PhysicalSortExpr, SendableRecordBatchStream};
+use arrow::datatypes::{DataType, SchemaRef};
+use arrow_array::*;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+macro_rules! primitive_merge_helper {
+ ($t:ty, $($v:ident),+) => {
+ merge_helper!(PrimitiveArray<$t>, $($v),+)
+ };
+}
+
+macro_rules! merge_helper {
+ ($t:ty, $sort:ident, $streams:ident, $schema:ident,
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident)
=> {{
+ let streams = FieldCursorStream::<$t>::new($sort, $streams);
+ return Ok(Box::pin(SortPreservingMergeStream::new(
+ Box::new(streams),
+ $schema,
+ $tracking_metrics,
+ $batch_size,
+ $fetch,
+ $reservation,
+ )));
+ }};
+}
+
+/// Perform a streaming merge of [`SendableRecordBatchStream`] based on
provided sort expressions
+/// while preserving order.
+pub fn streaming_merge(
+ streams: Vec<SendableRecordBatchStream>,
+ schema: SchemaRef,
+ expressions: &[PhysicalSortExpr],
+ metrics: BaselineMetrics,
+ batch_size: usize,
+ fetch: Option<usize>,
+ reservation: MemoryReservation,
+) -> Result<SendableRecordBatchStream> {
+ // Special case single column comparisons with optimized cursor
implementations
+ if expressions.len() == 1 {
+ let sort = expressions[0].clone();
+ let data_type = sort.expr.data_type(schema.as_ref())?;
+ downcast_primitive! {
+ data_type => (primitive_merge_helper, sort, streams, schema,
metrics, batch_size, fetch, reservation),
+ DataType::Utf8 => merge_helper!(StringArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
+ DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
+ DataType::Binary => merge_helper!(BinaryArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
+ DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
+ _ => {}
+ }
+ }
+
+ let streams = RowCursorStream::try_new(
+ schema.as_ref(),
+ expressions,
+ streams,
+ reservation.new_empty(),
+ )?;
+
+ Ok(Box::pin(SortPreservingMergeStream::new(
+ Box::new(streams),
+ schema,
+ metrics,
+ batch_size,
+ fetch,
+ reservation,
+ )))
+}