This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f1360b8486 Minor: move batch spilling methods to `lib.rs` to make it 
reusable (#11154)
f1360b8486 is described below

commit f1360b8486641cbac15212466eddef5bd6503708
Author: Oleks V <[email protected]>
AuthorDate: Fri Jun 28 09:05:49 2024 -0700

    Minor: move batch spilling methods to `lib.rs` to make it reusable (#11154)
---
 .../physical-plan/src/aggregates/row_hash.rs       |  6 +-
 datafusion/physical-plan/src/lib.rs                | 64 ++++++++++++++-
 datafusion/physical-plan/src/sorts/sort.rs         | 95 +++-------------------
 3 files changed, 78 insertions(+), 87 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index ad0860b93a..27577e6c8b 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -29,10 +29,10 @@ use crate::aggregates::{
 };
 use crate::common::IPCWriter;
 use crate::metrics::{BaselineMetrics, RecordOutput};
-use crate::sorts::sort::{read_spill_as_stream, sort_batch};
+use crate::sorts::sort::sort_batch;
 use crate::sorts::streaming_merge;
 use crate::stream::RecordBatchStreamAdapter;
-use crate::{aggregates, ExecutionPlan, PhysicalExpr};
+use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr};
 use crate::{RecordBatchStream, SendableRecordBatchStream};
 
 use arrow::array::*;
@@ -752,7 +752,7 @@ impl GroupedHashAggregateStream {
             })),
         )));
         for spill in self.spill_state.spills.drain(..) {
-            let stream = read_spill_as_stream(spill, schema.clone())?;
+            let stream = read_spill_as_stream(spill, schema.clone(), 2)?;
             streams.push(stream);
         }
         self.spill_state.is_stream_merging = true;
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index c648547c98..aef5b30796 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -19,6 +19,9 @@
 
 use std::any::Any;
 use std::fmt::Debug;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
 use crate::coalesce_partitions::CoalescePartitionsExec;
@@ -28,15 +31,18 @@ use crate::repartition::RepartitionExec;
 use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
 
 use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::Result;
+use datafusion_common::{exec_datafusion_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{
     EquivalenceProperties, LexOrdering, PhysicalSortExpr, 
PhysicalSortRequirement,
 };
 
 use futures::stream::TryStreamExt;
+use log::debug;
+use tokio::sync::mpsc::Sender;
 use tokio::task::JoinSet;
 
 mod ordering;
@@ -87,8 +93,13 @@ pub use datafusion_physical_expr::{
 };
 
 // Backwards compatibility
+use crate::common::IPCWriter;
 pub use crate::stream::EmptyRecordBatchStream;
+use crate::stream::RecordBatchReceiverStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::human_readable_size;
 pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+
 pub mod udaf {
     pub use datafusion_physical_expr_common::aggregate::{
         create_aggregate_expr, AggregateFunctionExpr,
@@ -799,6 +810,57 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> 
Vec<String> {
     actual.iter().map(|elem| elem.to_string()).collect()
 }
 
+/// Read spilled batches from the disk
+///
+/// `path` - temp file
+/// `schema` - batches schema, should be the same across batches
+/// `buffer` - internal buffer of capacity batches
+pub fn read_spill_as_stream(
+    path: RefCountedTempFile,
+    schema: SchemaRef,
+    buffer: usize,
+) -> Result<SendableRecordBatchStream> {
+    let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
+    let sender = builder.tx();
+
+    builder.spawn_blocking(move || read_spill(sender, path.path()));
+
+    Ok(builder.build())
+}
+
+/// Spills in-memory `batches` to disk.
+///
+/// Returns total number of the rows spilled to disk.
+pub fn spill_record_batches(
+    batches: Vec<RecordBatch>,
+    path: PathBuf,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
+    for batch in batches {
+        writer.write(&batch)?;
+    }
+    writer.finish()?;
+    debug!(
+        "Spilled {} batches of total {} rows to disk, memory released {}",
+        writer.num_batches,
+        writer.num_rows,
+        human_readable_size(writer.num_bytes),
+    );
+    Ok(writer.num_rows)
+}
+
+fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
+    let file = BufReader::new(File::open(path)?);
+    let reader = FileReader::try_new(file, None)?;
+    for batch in reader {
+        sender
+            .blocking_send(batch.map_err(Into::into))
+            .map_err(|e| exec_datafusion_err!("{e}"))?;
+    }
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use std::any::Any;
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 2a48625345..47901591c8 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -22,45 +22,38 @@
 use std::any::Any;
 use std::fmt;
 use std::fmt::{Debug, Formatter};
-use std::fs::File;
-use std::io::BufReader;
-use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
-use crate::common::{spawn_buffered, IPCWriter};
+use crate::common::spawn_buffered;
 use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
 use crate::sorts::streaming_merge::streaming_merge;
-use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
+use crate::stream::RecordBatchStreamAdapter;
 use crate::topk::TopK;
 use crate::{
-    DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionMode,
-    ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
-    SendableRecordBatchStream, Statistics,
+    read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType,
+    Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan,
+    ExecutionPlanProperties, Partitioning, PlanProperties, 
SendableRecordBatchStream,
+    Statistics,
 };
 
 use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
 use arrow::datatypes::SchemaRef;
-use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use arrow::row::{RowConverter, SortField};
 use arrow_array::{Array, RecordBatchOptions, UInt32Array};
 use arrow_schema::DataType;
-use datafusion_common::{exec_err, DataFusionError, Result};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
-use datafusion_execution::memory_pool::{
-    human_readable_size, MemoryConsumer, MemoryReservation,
-};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::LexOrdering;
 
 use futures::{StreamExt, TryStreamExt};
-use log::{debug, error, trace};
-use tokio::sync::mpsc::Sender;
+use log::{debug, trace};
 
 struct ExternalSorterMetrics {
     /// metrics
@@ -345,7 +338,7 @@ impl ExternalSorter {
                         spill.path()
                     )));
                 }
-                let stream = read_spill_as_stream(spill, self.schema.clone())?;
+                let stream = read_spill_as_stream(spill, self.schema.clone(), 
2)?;
                 streams.push(stream);
             }
 
@@ -402,7 +395,7 @@ impl ExternalSorter {
         let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
         let spilled_rows =
-            spill_sorted_batches(batches, spill_file.path(), 
self.schema.clone()).await?;
+            spill_record_batches(batches, spill_file.path().into(), 
self.schema.clone())?;
         let used = self.reservation.free();
         self.metrics.spill_count.add(1);
         self.metrics.spilled_bytes.add(used);
@@ -667,70 +660,6 @@ pub(crate) fn lexsort_to_indices_multi_columns(
     Ok(indices)
 }
 
-/// Spills sorted `in_memory_batches` to disk.
-///
-/// Returns number of the rows spilled to disk.
-async fn spill_sorted_batches(
-    batches: Vec<RecordBatch>,
-    path: &Path,
-    schema: SchemaRef,
-) -> Result<usize> {
-    let path: PathBuf = path.into();
-    let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, 
schema));
-    match task.join().await {
-        Ok(r) => r,
-        Err(e) => exec_err!("Error occurred while spilling {e}"),
-    }
-}
-
-pub(crate) fn read_spill_as_stream(
-    path: RefCountedTempFile,
-    schema: SchemaRef,
-) -> Result<SendableRecordBatchStream> {
-    let mut builder = RecordBatchReceiverStream::builder(schema, 2);
-    let sender = builder.tx();
-
-    builder.spawn_blocking(move || {
-        let result = read_spill(sender, path.path());
-        if let Err(e) = &result {
-            error!("Failure while reading spill file: {:?}. Error: {}", path, 
e);
-        }
-        result
-    });
-
-    Ok(builder.build())
-}
-
-fn write_sorted(
-    batches: Vec<RecordBatch>,
-    path: PathBuf,
-    schema: SchemaRef,
-) -> Result<usize> {
-    let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
-    for batch in batches {
-        writer.write(&batch)?;
-    }
-    writer.finish()?;
-    debug!(
-        "Spilled {} batches of total {} rows to disk, memory released {}",
-        writer.num_batches,
-        writer.num_rows,
-        human_readable_size(writer.num_bytes),
-    );
-    Ok(writer.num_rows)
-}
-
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
-    let file = BufReader::new(File::open(path)?);
-    let reader = FileReader::try_new(file, None)?;
-    for batch in reader {
-        sender
-            .blocking_send(batch.map_err(Into::into))
-            .map_err(|e| DataFusionError::Execution(format!("{e}")))?;
-    }
-    Ok(())
-}
-
 /// Sort execution plan.
 ///
 /// Support sorting datasets that are larger than the memory allotted
@@ -776,7 +705,7 @@ impl SortExec {
     /// Specify the partitioning behavior of this sort exec
     ///
     /// If `preserve_partitioning` is true, sorts each partition
-    /// individually, producing one sorted strema for each input partition.
+    /// individually, producing one sorted stream for each input partition.
     ///
     /// If `preserve_partitioning` is false, sorts and merges all
     /// input partitions producing a single, sorted partition.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to