This is an automated email from the ASF dual-hosted git repository. ytyou pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 3c4e39ac0c Add compression option to SpillManager (#16268) 3c4e39ac0c is described below commit 3c4e39ac0cf83bd8ead45722a5873bac731b53f1 Author: ding-young <lsyh...@snu.ac.kr> AuthorDate: Fri Jun 20 16:27:31 2025 +0900 Add compression option to SpillManager (#16268) * Add compression option to SpillManager * add test case for spill compression * Chore: fix docs & fmt * set spill compression option with separate api * chore * chore: add comment --- Cargo.lock | 1 + Cargo.toml | 1 + datafusion/common/src/config.rs | 67 ++++++++++ datafusion/core/tests/memory_limit/mod.rs | 82 ++++++++++++- datafusion/execution/src/config.rs | 15 ++- .../physical-plan/src/aggregates/row_hash.rs | 3 +- .../physical-plan/src/joins/sort_merge_join.rs | 7 +- datafusion/physical-plan/src/sorts/sort.rs | 7 +- .../src/spill/in_progress_spill_file.rs | 1 + datafusion/physical-plan/src/spill/mod.rs | 135 ++++++++++++++++++++- .../physical-plan/src/spill/spill_manager.rs | 11 +- .../sqllogictest/test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 13 files changed, 319 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2e593e625..194483b7ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,6 +398,7 @@ dependencies = [ "arrow-schema", "flatbuffers", "lz4_flex", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f2cd6f72c7..62b56c0939 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ arrow-flight = { version = "55.1.0", features = [ ] } arrow-ipc = { version = "55.0.0", default-features = false, features = [ "lz4", + "zstd", ] } arrow-ord = { version = "55.0.0", default-features = false } arrow-schema = { version = "55.0.0", default-features = false } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c6ac2f0b50..def64517ce 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -17,6 +17,8 @@ //! Runtime configuration, via [`ConfigOptions`] +use arrow_ipc::CompressionType; + use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; @@ -274,6 +276,61 @@ config_namespace! { } } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum SpillCompression { + Zstd, + Lz4Frame, + #[default] + Uncompressed, +} + +impl FromStr for SpillCompression { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_ascii_lowercase().as_str() { + "zstd" => Ok(Self::Zstd), + "lz4_frame" => Ok(Self::Lz4Frame), + "uncompressed" | "" => Ok(Self::Uncompressed), + other => Err(DataFusionError::Configuration(format!( + "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed" + ))), + } + } +} + +impl ConfigField for SpillCompression { + fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = SpillCompression::from_str(value)?; + Ok(()) + } +} + +impl Display for SpillCompression { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let str = match self { + Self::Zstd => "zstd", + Self::Lz4Frame => "lz4_frame", + Self::Uncompressed => "uncompressed", + }; + write!(f, "{str}") + } +} + +impl From<SpillCompression> for Option<CompressionType> { + fn from(c: SpillCompression) -> Self { + match c { + SpillCompression::Zstd => Some(CompressionType::ZSTD), + SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME), + SpillCompression::Uncompressed => None, + } + } +} + config_namespace! { /// Options related to query execution /// @@ -330,6 +387,16 @@ config_namespace! { /// the new schema verification step. pub skip_physical_aggregate_schema_check: bool, default = false + /// Sets the compression codec used when spilling data to disk. + /// + /// Since datafusion writes spill files using the Arrow IPC Stream format, + /// only codecs supported by the Arrow IPC Stream Writer are allowed. + /// Valid values are: uncompressed, lz4_frame, zstd. + /// Note: lz4_frame offers faster (de)compression, but typically results in + /// larger spill files. In contrast, zstd achieves + /// higher compression ratios at the cost of slower (de)compression speed. + pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed + /// Specifies the reserved memory for each spillable sort operation to /// facilitate an in-memory merge. /// diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 4ac779d9c2..2b262d4326 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -28,6 +28,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow_schema::{DataType, Field, Schema}; use datafusion::assert_batches_eq; +use datafusion::config::SpillCompression; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::datasource::{MemTable, TableProvider}; @@ -545,10 +546,11 @@ async fn test_external_sort_zero_merge_reservation() { // Tests for disk limit (`max_temp_directory_size` in `DiskManager`) // ------------------------------------------------------------------ -// Create a new `SessionContext` with speicified disk limit and memory pool limit +// Create a new `SessionContext` with speicified disk limit, memory pool limit, and spill compression codec async fn setup_context( disk_limit: u64, memory_pool_limit: usize, + spill_compression: SpillCompression, ) -> Result<SessionContext> { let disk_manager = DiskManagerBuilder::default() .with_mode(DiskManagerMode::OsTmpDirectory) @@ -570,6 +572,7 @@ async fn setup_context( let config = SessionConfig::new() .with_sort_spill_reservation_bytes(64 * 1024) // 256KB .with_sort_in_place_threshold_bytes(0) + .with_spill_compression(spill_compression) .with_batch_size(64) // To reduce test memory usage .with_target_partitions(1); @@ -580,7 +583,8 @@ async fn setup_context( /// (specified by `max_temp_directory_size` in `DiskManager`) #[tokio::test] async fn test_disk_spill_limit_reached() -> Result<()> { - let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit + let spill_compression = SpillCompression::Uncompressed; + let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit let df = ctx .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") @@ -602,7 +606,8 @@ async fn test_disk_spill_limit_reached() -> Result<()> { #[tokio::test] async fn test_disk_spill_limit_not_reached() -> Result<()> { let disk_spill_limit = 1024 * 1024; // 1MB - let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit + let spill_compression = SpillCompression::Uncompressed; + let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit let df = ctx .sql("select * from generate_series(1, 10000) as t1(v1) order by v1") @@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> { Ok(()) } +/// External query should succeed using zstd as spill compression codec and +/// and all temporary spill files are properly cleaned up after execution. +/// Note: This test does not inspect file contents (e.g. magic number), +/// as spill files are automatically deleted on drop. +#[tokio::test] +async fn test_spill_file_compressed_with_zstd() -> Result<()> { + let disk_spill_limit = 1024 * 1024; // 1MB + let spill_compression = SpillCompression::Zstd; + let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd + + let df = ctx + .sql("select * from generate_series(1, 100000) as t1(v1) order by v1") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect_batches(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap(); + + println!("spill count {spill_count}"); + assert!(spill_count > 0); + assert!((spilled_bytes as u64) < disk_spill_limit); + + // Verify that all temporary files have been properly cleaned up by checking + // that the total disk usage tracked by the disk manager is zero + let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space(); + assert_eq!(current_disk_usage, 0); + + Ok(()) +} + +/// External query should succeed using lz4_frame as spill compression codec and +/// and all temporary spill files are properly cleaned up after execution. +/// Note: This test does not inspect file contents (e.g. magic number), +/// as spill files are automatically deleted on drop. +#[tokio::test] +async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> { + let disk_spill_limit = 1024 * 1024; // 1MB + let spill_compression = SpillCompression::Lz4Frame; + let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame + + let df = ctx + .sql("select * from generate_series(1, 100000) as t1(v1) order by v1") + .await + .unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect_batches(Arc::clone(&plan), task_ctx) + .await + .expect("Query execution failed"); + + let spill_count = plan.metrics().unwrap().spill_count().unwrap(); + let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap(); + + println!("spill count {spill_count}"); + assert!(spill_count > 0); + assert!((spilled_bytes as u64) < disk_spill_limit); + + // Verify that all temporary files have been properly cleaned up by checking + // that the total disk usage tracked by the disk manager is zero + let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space(); + assert_eq!(current_disk_usage, 0); + + Ok(()) +} /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 1e00a1ce47..c1ee2820c0 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -23,7 +23,7 @@ use std::{ }; use datafusion_common::{ - config::{ConfigExtension, ConfigOptions}, + config::{ConfigExtension, ConfigOptions, SpillCompression}, Result, ScalarValue, }; @@ -258,6 +258,11 @@ impl SessionConfig { self.options.execution.collect_statistics } + /// Compression codec for spill file + pub fn spill_compression(&self) -> SpillCompression { + self.options.execution.spill_compression + } + /// Selects a name for the default catalog and schema pub fn with_default_catalog_and_schema( mut self, @@ -421,6 +426,14 @@ impl SessionConfig { self } + /// Set the compression codec [`spill_compression`] used when spilling data to disk. + /// + /// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression + pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self { + self.options.execution.spill_compression = spill_compression; + self + } + /// Set the size of [`sort_in_place_threshold_bytes`] to control /// how sort does things. /// diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 405c0b2059..1d659d7280 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -552,7 +552,8 @@ impl GroupedHashAggregateStream { context.runtime_env(), metrics::SpillMetrics::new(&agg.metrics, partition), Arc::clone(&partial_agg_schema), - ); + ) + .with_compression_type(context.session_config().spill_compression()); let spill_state = SpillState { spills: vec![], diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 4d635948ed..a8c209a492 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -61,6 +61,7 @@ use arrow::compute::{ use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::ipc::reader::StreamReader; +use datafusion_common::config::SpillCompression; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide, JoinType, NullEquality, Result, @@ -500,6 +501,7 @@ impl ExecutionPlan for SortMergeJoinExec { // create join stream Ok(Box::pin(SortMergeJoinStream::try_new( + context.session_config().spill_compression(), Arc::clone(&self.schema), self.sort_options.clone(), self.null_equality, @@ -1324,6 +1326,8 @@ impl Stream for SortMergeJoinStream { impl SortMergeJoinStream { #[allow(clippy::too_many_arguments)] pub fn try_new( + // Configured via `datafusion.execution.spill_compression`. + spill_compression: SpillCompression, schema: SchemaRef, sort_options: Vec<SortOptions>, null_equality: NullEquality, @@ -1344,7 +1348,8 @@ impl SortMergeJoinStream { Arc::clone(&runtime_env), join_metrics.spill_metrics.clone(), Arc::clone(&buffered_schema), - ); + ) + .with_compression_type(spill_compression); Ok(Self { state: SortMergeJoinState::Init, sort_options, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 85c39820d5..f941827dd0 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -48,6 +48,7 @@ use crate::{ use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; +use datafusion_common::config::SpillCompression; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -258,6 +259,8 @@ impl ExternalSorter { batch_size: usize, sort_spill_reservation_bytes: usize, sort_in_place_threshold_bytes: usize, + // Configured via `datafusion.execution.spill_compression`. + spill_compression: SpillCompression, metrics: &ExecutionPlanMetricsSet, runtime: Arc<RuntimeEnv>, ) -> Result<Self> { @@ -274,7 +277,8 @@ impl ExternalSorter { Arc::clone(&runtime), metrics.spill_metrics.clone(), Arc::clone(&schema), - ); + ) + .with_compression_type(spill_compression); Ok(Self { schema, @@ -1173,6 +1177,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), execution_options.sort_spill_reservation_bytes, execution_options.sort_in_place_threshold_bytes, + context.session_config().spill_compression(), &self.metrics_set, context.runtime_env(), )?; diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 7617e0a22a..61d54bb738 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -67,6 +67,7 @@ impl InProgressSpillFile { self.writer = Some(IPCStreamWriter::new( in_progress_file.path(), schema.as_ref(), + self.spill_writer.compression, )?); // Update metrics diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 1101616a41..a6dbce5370 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -30,9 +30,14 @@ use std::task::{Context, Poll}; use arrow::array::ArrayData; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; +use arrow::ipc::{ + reader::StreamReader, + writer::{IpcWriteOptions, StreamWriter}, + MetadataVersion, +}; use arrow::record_batch::RecordBatch; +use datafusion_common::config::SpillCompression; use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::disk_manager::RefCountedTempFile; @@ -194,7 +199,8 @@ pub fn spill_record_batch_by_size( ) -> Result<()> { let mut offset = 0; let total_rows = batch.num_rows(); - let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?; + let mut writer = + IPCStreamWriter::new(&path, schema.as_ref(), SpillCompression::Uncompressed)?; while offset < total_rows { let length = std::cmp::min(total_rows - offset, batch_size_rows); @@ -292,15 +298,27 @@ struct IPCStreamWriter { impl IPCStreamWriter { /// Create new writer - pub fn new(path: &Path, schema: &Schema) -> Result<Self> { + pub fn new( + path: &Path, + schema: &Schema, + compression_type: SpillCompression, + ) -> Result<Self> { let file = File::create(path).map_err(|e| { exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") })?; + + let metadata_version = MetadataVersion::V5; + let alignment = 8; + let mut write_options = + IpcWriteOptions::try_new(alignment, false, metadata_version)?; + write_options = write_options.try_with_compression(compression_type.into())?; + + let writer = StreamWriter::try_new_with_options(file, schema, write_options)?; Ok(Self { num_batches: 0, num_rows: 0, num_bytes: 0, - writer: StreamWriter::try_new(file, schema)?, + writer, }) } @@ -332,7 +350,7 @@ mod tests { use crate::metrics::SpillMetrics; use crate::spill::spill_manager::SpillManager; use crate::test::build_table_i32; - use arrow::array::{Float64Array, Int32Array, ListArray, StringArray}; + use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray}; use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; @@ -470,6 +488,113 @@ mod tests { Ok(()) } + fn build_compressible_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, true), + ])); + + let a: ArrayRef = Arc::new(StringArray::from_iter_values(std::iter::repeat_n( + "repeated", 100, + ))); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1; 100])); + let c: ArrayRef = Arc::new(Int32Array::from(vec![2; 100])); + + RecordBatch::try_new(schema, vec![a, b, c]).unwrap() + } + + async fn validate( + spill_manager: &SpillManager, + spill_file: RefCountedTempFile, + num_rows: usize, + schema: SchemaRef, + batch_count: usize, + ) -> Result<()> { + let spilled_rows = spill_manager.metrics.spilled_rows.value(); + assert_eq!(spilled_rows, num_rows); + + let stream = spill_manager.read_spill_as_stream(spill_file)?; + assert_eq!(stream.schema(), schema); + + let batches = collect(stream).await?; + assert_eq!(batches.len(), batch_count); + + Ok(()) + } + + #[tokio::test] + async fn test_spill_compression() -> Result<()> { + let batch = build_compressible_batch(); + let num_rows = batch.num_rows(); + let schema = batch.schema(); + let batch_count = 1; + let batches = [batch]; + + // Construct SpillManager + let env = Arc::new(RuntimeEnv::default()); + let uncompressed_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let lz4_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let zstd_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let uncompressed_spill_manager = SpillManager::new( + Arc::clone(&env), + uncompressed_metrics, + Arc::clone(&schema), + ); + let lz4_spill_manager = + SpillManager::new(Arc::clone(&env), lz4_metrics, Arc::clone(&schema)) + .with_compression_type(SpillCompression::Lz4Frame); + let zstd_spill_manager = + SpillManager::new(env, zstd_metrics, Arc::clone(&schema)) + .with_compression_type(SpillCompression::Zstd); + let uncompressed_spill_file = uncompressed_spill_manager + .spill_record_batch_and_finish(&batches, "Test")? + .unwrap(); + let lz4_spill_file = lz4_spill_manager + .spill_record_batch_and_finish(&batches, "Lz4_Test")? + .unwrap(); + let zstd_spill_file = zstd_spill_manager + .spill_record_batch_and_finish(&batches, "ZSTD_Test")? + .unwrap(); + assert!(uncompressed_spill_file.path().exists()); + assert!(lz4_spill_file.path().exists()); + assert!(zstd_spill_file.path().exists()); + + let lz4_spill_size = std::fs::metadata(lz4_spill_file.path())?.len(); + let zstd_spill_size = std::fs::metadata(zstd_spill_file.path())?.len(); + let uncompressed_spill_size = + std::fs::metadata(uncompressed_spill_file.path())?.len(); + + assert!(uncompressed_spill_size > lz4_spill_size); + assert!(uncompressed_spill_size > zstd_spill_size); + + validate( + &lz4_spill_manager, + lz4_spill_file, + num_rows, + Arc::clone(&schema), + batch_count, + ) + .await?; + validate( + &zstd_spill_manager, + zstd_spill_file, + num_rows, + Arc::clone(&schema), + batch_count, + ) + .await?; + validate( + &uncompressed_spill_manager, + uncompressed_spill_file, + num_rows, + schema, + batch_count, + ) + .await?; + Ok(()) + } + #[test] fn test_get_record_batch_memory_size() { // Create a simple record batch with two columns diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 78cd47a8ba..89fd12d398 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -23,7 +23,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_common::Result; +use datafusion_common::{config::SpillCompression, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::SendableRecordBatchStream; @@ -44,7 +44,8 @@ pub struct SpillManager { schema: SchemaRef, /// Number of batches to buffer in memory during disk reads batch_read_buffer_capacity: usize, - // TODO: Add general-purpose compression options + /// general-purpose compression options + pub(crate) compression: SpillCompression, } impl SpillManager { @@ -54,9 +55,15 @@ impl SpillManager { metrics, schema, batch_read_buffer_capacity: 2, + compression: SpillCompression::default(), } } + pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self { + self.compression = spill_compression; + self + } + /// Creates a temporary file for in-progress operations, returning an error /// message if file creation fails. The file can be used to append batches /// incrementally and then finish the file when done. diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index bf9ae20e7b..bc9336dcc6 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -263,6 +263,7 @@ datafusion.execution.skip_physical_aggregate_schema_check false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 +datafusion.execution.spill_compression uncompressed datafusion.execution.split_file_groups_by_statistics false datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 @@ -375,6 +376,7 @@ datafusion.execution.skip_physical_aggregate_schema_check false When set to true datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). +datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system datafusion.execution.time_zone +00:00 The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b55e63293f..23a35c896d 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -83,6 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile [...] | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system [...] | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. [...] +| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression rati [...] | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's [...] | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. [...] | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics [...] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org