yjshen commented on a change in pull request #1596: URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r787541398
########## File path: datafusion/src/physical_plan/sorts/sort.rs ########## @@ -15,47 +15,450 @@ // specific language governing permissions and limitations // under the License. -//! Defines the SORT plan +//! Sort that deals with an arbitrary size of the input. +//! It will do in-memory sorting if it has enough memory budget +//! but spills to disk if needed. use crate::error::{DataFusionError, Result}; +use crate::execution::memory_manager::{ + ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, +}; use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::AbortOnDropSingle; +use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time, }; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; +use crate::physical_plan::sorts::SortedStream; +use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, }; -use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; +use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; -use arrow::{array::ArrayRef, error::ArrowError}; use async_trait::async_trait; -use futures::stream::Stream; -use futures::Future; -use pin_project_lite::pin_project; +use futures::lock::Mutex; +use futures::StreamExt; +use log::{error, info}; use std::any::Any; -use std::pin::Pin; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::File; +use std::io::BufReader; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender}; +use tokio::task; + +/// Sort arbitrary size of data to get an total order (may spill several times during sorting based on free memory available). +/// +/// The basic architecture of the algorithm: +/// +/// let spills = vec![]; Review comment: Changed with descriptions instead of pseudo code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org