This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c4e39ac0c Add compression option to SpillManager (#16268)
3c4e39ac0c is described below

commit 3c4e39ac0cf83bd8ead45722a5873bac731b53f1
Author: ding-young <lsyh...@snu.ac.kr>
AuthorDate: Fri Jun 20 16:27:31 2025 +0900

    Add compression option to SpillManager (#16268)
    
    * Add compression option to SpillManager
    
    * add test case for spill compression
    
    * Chore: fix docs & fmt
    
    * set spill compression option with separate api
    
    * chore
    
    * chore: add comment
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 datafusion/common/src/config.rs                    |  67 ++++++++++
 datafusion/core/tests/memory_limit/mod.rs          |  82 ++++++++++++-
 datafusion/execution/src/config.rs                 |  15 ++-
 .../physical-plan/src/aggregates/row_hash.rs       |   3 +-
 .../physical-plan/src/joins/sort_merge_join.rs     |   7 +-
 datafusion/physical-plan/src/sorts/sort.rs         |   7 +-
 .../src/spill/in_progress_spill_file.rs            |   1 +
 datafusion/physical-plan/src/spill/mod.rs          | 135 ++++++++++++++++++++-
 .../physical-plan/src/spill/spill_manager.rs       |  11 +-
 .../sqllogictest/test_files/information_schema.slt |   2 +
 docs/source/user-guide/configs.md                  |   1 +
 13 files changed, 319 insertions(+), 14 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e2e593e625..194483b7ab 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -398,6 +398,7 @@ dependencies = [
  "arrow-schema",
  "flatbuffers",
  "lz4_flex",
+ "zstd",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index f2cd6f72c7..62b56c0939 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -98,6 +98,7 @@ arrow-flight = { version = "55.1.0", features = [
 ] }
 arrow-ipc = { version = "55.0.0", default-features = false, features = [
     "lz4",
+    "zstd",
 ] }
 arrow-ord = { version = "55.0.0", default-features = false }
 arrow-schema = { version = "55.0.0", default-features = false }
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index c6ac2f0b50..def64517ce 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -17,6 +17,8 @@
 
 //! Runtime configuration, via [`ConfigOptions`]
 
+use arrow_ipc::CompressionType;
+
 use crate::error::_config_err;
 use crate::parsers::CompressionTypeVariant;
 use crate::utils::get_available_parallelism;
@@ -274,6 +276,61 @@ config_namespace! {
     }
 }
 
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+    Zstd,
+    Lz4Frame,
+    #[default]
+    Uncompressed,
+}
+
+impl FromStr for SpillCompression {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_ascii_lowercase().as_str() {
+            "zstd" => Ok(Self::Zstd),
+            "lz4_frame" => Ok(Self::Lz4Frame),
+            "uncompressed" | "" => Ok(Self::Uncompressed),
+            other => Err(DataFusionError::Configuration(format!(
+                "Invalid Spill file compression type: {other}. Expected one 
of: zstd, lz4_frame, uncompressed"
+            ))),
+        }
+    }
+}
+
+impl ConfigField for SpillCompression {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) 
{
+        v.some(key, self, description)
+    }
+
+    fn set(&mut self, _: &str, value: &str) -> Result<()> {
+        *self = SpillCompression::from_str(value)?;
+        Ok(())
+    }
+}
+
+impl Display for SpillCompression {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let str = match self {
+            Self::Zstd => "zstd",
+            Self::Lz4Frame => "lz4_frame",
+            Self::Uncompressed => "uncompressed",
+        };
+        write!(f, "{str}")
+    }
+}
+
+impl From<SpillCompression> for Option<CompressionType> {
+    fn from(c: SpillCompression) -> Self {
+        match c {
+            SpillCompression::Zstd => Some(CompressionType::ZSTD),
+            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            SpillCompression::Uncompressed => None,
+        }
+    }
+}
+
 config_namespace! {
     /// Options related to query execution
     ///
@@ -330,6 +387,16 @@ config_namespace! {
         /// the new schema verification step.
         pub skip_physical_aggregate_schema_check: bool, default = false
 
+        /// Sets the compression codec used when spilling data to disk.
+        ///
+        /// Since datafusion writes spill files using the Arrow IPC Stream 
format,
+        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
+        /// Valid values are: uncompressed, lz4_frame, zstd.
+        /// Note: lz4_frame offers faster (de)compression, but typically 
results in
+        /// larger spill files. In contrast, zstd achieves
+        /// higher compression ratios at the cost of slower (de)compression 
speed.
+        pub spill_compression: SpillCompression, default = 
SpillCompression::Uncompressed
+
         /// Specifies the reserved memory for each spillable sort operation to
         /// facilitate an in-memory merge.
         ///
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index 4ac779d9c2..2b262d4326 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -28,6 +28,7 @@ use arrow::compute::SortOptions;
 use arrow::datatypes::{Int32Type, SchemaRef};
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::assert_batches_eq;
+use datafusion::config::SpillCompression;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::datasource::{MemTable, TableProvider};
@@ -545,10 +546,11 @@ async fn test_external_sort_zero_merge_reservation() {
 // Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
 // ------------------------------------------------------------------
 
-// Create a new `SessionContext` with speicified disk limit and memory pool 
limit
+// Create a new `SessionContext` with speicified disk limit, memory pool 
limit, and spill compression codec
 async fn setup_context(
     disk_limit: u64,
     memory_pool_limit: usize,
+    spill_compression: SpillCompression,
 ) -> Result<SessionContext> {
     let disk_manager = DiskManagerBuilder::default()
         .with_mode(DiskManagerMode::OsTmpDirectory)
@@ -570,6 +572,7 @@ async fn setup_context(
     let config = SessionConfig::new()
         .with_sort_spill_reservation_bytes(64 * 1024) // 256KB
         .with_sort_in_place_threshold_bytes(0)
+        .with_spill_compression(spill_compression)
         .with_batch_size(64) // To reduce test memory usage
         .with_target_partitions(1);
 
@@ -580,7 +583,8 @@ async fn setup_context(
 /// (specified by `max_temp_directory_size` in `DiskManager`)
 #[tokio::test]
 async fn test_disk_spill_limit_reached() -> Result<()> {
-    let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk 
limit, 1MB memory limit
+    let spill_compression = SpillCompression::Uncompressed;
+    let ctx = setup_context(1024 * 1024, 1024 * 1024, 
spill_compression).await?; // 1MB disk limit, 1MB memory limit
 
     let df = ctx
         .sql("select * from generate_series(1, 1000000000000) as t1(v1) order 
by v1")
@@ -602,7 +606,8 @@ async fn test_disk_spill_limit_reached() -> Result<()> {
 #[tokio::test]
 async fn test_disk_spill_limit_not_reached() -> Result<()> {
     let disk_spill_limit = 1024 * 1024; // 1MB
-    let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk 
limit, 128KB memory limit
+    let spill_compression = SpillCompression::Uncompressed;
+    let ctx = setup_context(disk_spill_limit, 128 * 1024, 
spill_compression).await?; // 1MB disk limit, 128KB memory limit
 
     let df = ctx
         .sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
@@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> 
{
     Ok(())
 }
 
+/// External query should succeed using zstd as spill compression codec and
+/// and all temporary spill files are properly cleaned up after execution.
+/// Note: This test does not inspect file contents (e.g. magic number),
+/// as spill files are automatically deleted on drop.
+#[tokio::test]
+async fn test_spill_file_compressed_with_zstd() -> Result<()> {
+    let disk_spill_limit = 1024 * 1024; // 1MB
+    let spill_compression = SpillCompression::Zstd;
+    let ctx = setup_context(disk_spill_limit, 128 * 1024, 
spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd
+
+    let df = ctx
+        .sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
+        .await
+        .unwrap();
+    let plan = df.create_physical_plan().await.unwrap();
+
+    let task_ctx = ctx.task_ctx();
+    let _ = collect_batches(Arc::clone(&plan), task_ctx)
+        .await
+        .expect("Query execution failed");
+
+    let spill_count = plan.metrics().unwrap().spill_count().unwrap();
+    let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
+
+    println!("spill count {spill_count}");
+    assert!(spill_count > 0);
+    assert!((spilled_bytes as u64) < disk_spill_limit);
+
+    // Verify that all temporary files have been properly cleaned up by 
checking
+    // that the total disk usage tracked by the disk manager is zero
+    let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
+    assert_eq!(current_disk_usage, 0);
+
+    Ok(())
+}
+
+/// External query should succeed using lz4_frame as spill compression codec 
and
+/// and all temporary spill files are properly cleaned up after execution.
+/// Note: This test does not inspect file contents (e.g. magic number),
+/// as spill files are automatically deleted on drop.
+#[tokio::test]
+async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {
+    let disk_spill_limit = 1024 * 1024; // 1MB
+    let spill_compression = SpillCompression::Lz4Frame;
+    let ctx = setup_context(disk_spill_limit, 128 * 1024, 
spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame
+
+    let df = ctx
+        .sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
+        .await
+        .unwrap();
+    let plan = df.create_physical_plan().await.unwrap();
+
+    let task_ctx = ctx.task_ctx();
+    let _ = collect_batches(Arc::clone(&plan), task_ctx)
+        .await
+        .expect("Query execution failed");
+
+    let spill_count = plan.metrics().unwrap().spill_count().unwrap();
+    let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
+
+    println!("spill count {spill_count}");
+    assert!(spill_count > 0);
+    assert!((spilled_bytes as u64) < disk_spill_limit);
+
+    // Verify that all temporary files have been properly cleaned up by 
checking
+    // that the total disk usage tracked by the disk manager is zero
+    let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
+    assert_eq!(current_disk_usage, 0);
+
+    Ok(())
+}
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
diff --git a/datafusion/execution/src/config.rs 
b/datafusion/execution/src/config.rs
index 1e00a1ce47..c1ee2820c0 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -23,7 +23,7 @@ use std::{
 };
 
 use datafusion_common::{
-    config::{ConfigExtension, ConfigOptions},
+    config::{ConfigExtension, ConfigOptions, SpillCompression},
     Result, ScalarValue,
 };
 
@@ -258,6 +258,11 @@ impl SessionConfig {
         self.options.execution.collect_statistics
     }
 
+    /// Compression codec for spill file
+    pub fn spill_compression(&self) -> SpillCompression {
+        self.options.execution.spill_compression
+    }
+
     /// Selects a name for the default catalog and schema
     pub fn with_default_catalog_and_schema(
         mut self,
@@ -421,6 +426,14 @@ impl SessionConfig {
         self
     }
 
+    /// Set the compression codec [`spill_compression`] used when spilling 
data to disk.
+    ///
+    /// [`spill_compression`]: 
datafusion_common::config::ExecutionOptions::spill_compression
+    pub fn with_spill_compression(mut self, spill_compression: 
SpillCompression) -> Self {
+        self.options.execution.spill_compression = spill_compression;
+        self
+    }
+
     /// Set the size of [`sort_in_place_threshold_bytes`] to control
     /// how sort does things.
     ///
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 405c0b2059..1d659d7280 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -552,7 +552,8 @@ impl GroupedHashAggregateStream {
             context.runtime_env(),
             metrics::SpillMetrics::new(&agg.metrics, partition),
             Arc::clone(&partial_agg_schema),
-        );
+        )
+        .with_compression_type(context.session_config().spill_compression());
 
         let spill_state = SpillState {
             spills: vec![],
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 4d635948ed..a8c209a492 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -61,6 +61,7 @@ use arrow::compute::{
 use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
 use arrow::error::ArrowError;
 use arrow::ipc::reader::StreamReader;
+use datafusion_common::config::SpillCompression;
 use datafusion_common::{
     exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, 
JoinSide,
     JoinType, NullEquality, Result,
@@ -500,6 +501,7 @@ impl ExecutionPlan for SortMergeJoinExec {
 
         // create join stream
         Ok(Box::pin(SortMergeJoinStream::try_new(
+            context.session_config().spill_compression(),
             Arc::clone(&self.schema),
             self.sort_options.clone(),
             self.null_equality,
@@ -1324,6 +1326,8 @@ impl Stream for SortMergeJoinStream {
 impl SortMergeJoinStream {
     #[allow(clippy::too_many_arguments)]
     pub fn try_new(
+        // Configured via `datafusion.execution.spill_compression`.
+        spill_compression: SpillCompression,
         schema: SchemaRef,
         sort_options: Vec<SortOptions>,
         null_equality: NullEquality,
@@ -1344,7 +1348,8 @@ impl SortMergeJoinStream {
             Arc::clone(&runtime_env),
             join_metrics.spill_metrics.clone(),
             Arc::clone(&buffered_schema),
-        );
+        )
+        .with_compression_type(spill_compression);
         Ok(Self {
             state: SortMergeJoinState::Init,
             sort_options,
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 85c39820d5..f941827dd0 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -48,6 +48,7 @@ use crate::{
 use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
 use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
 use arrow::datatypes::SchemaRef;
+use datafusion_common::config::SpillCompression;
 use datafusion_common::{internal_datafusion_err, internal_err, 
DataFusionError, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -258,6 +259,8 @@ impl ExternalSorter {
         batch_size: usize,
         sort_spill_reservation_bytes: usize,
         sort_in_place_threshold_bytes: usize,
+        // Configured via `datafusion.execution.spill_compression`.
+        spill_compression: SpillCompression,
         metrics: &ExecutionPlanMetricsSet,
         runtime: Arc<RuntimeEnv>,
     ) -> Result<Self> {
@@ -274,7 +277,8 @@ impl ExternalSorter {
             Arc::clone(&runtime),
             metrics.spill_metrics.clone(),
             Arc::clone(&schema),
-        );
+        )
+        .with_compression_type(spill_compression);
 
         Ok(Self {
             schema,
@@ -1173,6 +1177,7 @@ impl ExecutionPlan for SortExec {
                     context.session_config().batch_size(),
                     execution_options.sort_spill_reservation_bytes,
                     execution_options.sort_in_place_threshold_bytes,
+                    context.session_config().spill_compression(),
                     &self.metrics_set,
                     context.runtime_env(),
                 )?;
diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs 
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index 7617e0a22a..61d54bb738 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -67,6 +67,7 @@ impl InProgressSpillFile {
                 self.writer = Some(IPCStreamWriter::new(
                     in_progress_file.path(),
                     schema.as_ref(),
+                    self.spill_writer.compression,
                 )?);
 
                 // Update metrics
diff --git a/datafusion/physical-plan/src/spill/mod.rs 
b/datafusion/physical-plan/src/spill/mod.rs
index 1101616a41..a6dbce5370 100644
--- a/datafusion/physical-plan/src/spill/mod.rs
+++ b/datafusion/physical-plan/src/spill/mod.rs
@@ -30,9 +30,14 @@ use std::task::{Context, Poll};
 
 use arrow::array::ArrayData;
 use arrow::datatypes::{Schema, SchemaRef};
-use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
+use arrow::ipc::{
+    reader::StreamReader,
+    writer::{IpcWriteOptions, StreamWriter},
+    MetadataVersion,
+};
 use arrow::record_batch::RecordBatch;
 
+use datafusion_common::config::SpillCompression;
 use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::disk_manager::RefCountedTempFile;
@@ -194,7 +199,8 @@ pub fn spill_record_batch_by_size(
 ) -> Result<()> {
     let mut offset = 0;
     let total_rows = batch.num_rows();
-    let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;
+    let mut writer =
+        IPCStreamWriter::new(&path, schema.as_ref(), 
SpillCompression::Uncompressed)?;
 
     while offset < total_rows {
         let length = std::cmp::min(total_rows - offset, batch_size_rows);
@@ -292,15 +298,27 @@ struct IPCStreamWriter {
 
 impl IPCStreamWriter {
     /// Create new writer
-    pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
+    pub fn new(
+        path: &Path,
+        schema: &Schema,
+        compression_type: SpillCompression,
+    ) -> Result<Self> {
         let file = File::create(path).map_err(|e| {
             exec_datafusion_err!("Failed to create partition file at {path:?}: 
{e:?}")
         })?;
+
+        let metadata_version = MetadataVersion::V5;
+        let alignment = 8;
+        let mut write_options =
+            IpcWriteOptions::try_new(alignment, false, metadata_version)?;
+        write_options = 
write_options.try_with_compression(compression_type.into())?;
+
+        let writer = StreamWriter::try_new_with_options(file, schema, 
write_options)?;
         Ok(Self {
             num_batches: 0,
             num_rows: 0,
             num_bytes: 0,
-            writer: StreamWriter::try_new(file, schema)?,
+            writer,
         })
     }
 
@@ -332,7 +350,7 @@ mod tests {
     use crate::metrics::SpillMetrics;
     use crate::spill::spill_manager::SpillManager;
     use crate::test::build_table_i32;
-    use arrow::array::{Float64Array, Int32Array, ListArray, StringArray};
+    use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, 
StringArray};
     use arrow::compute::cast;
     use arrow::datatypes::{DataType, Field, Int32Type, Schema};
     use arrow::record_batch::RecordBatch;
@@ -470,6 +488,113 @@ mod tests {
         Ok(())
     }
 
+    fn build_compressible_batch() -> RecordBatch {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, true),
+        ]));
+
+        let a: ArrayRef = 
Arc::new(StringArray::from_iter_values(std::iter::repeat_n(
+            "repeated", 100,
+        )));
+        let b: ArrayRef = Arc::new(Int32Array::from(vec![1; 100]));
+        let c: ArrayRef = Arc::new(Int32Array::from(vec![2; 100]));
+
+        RecordBatch::try_new(schema, vec![a, b, c]).unwrap()
+    }
+
+    async fn validate(
+        spill_manager: &SpillManager,
+        spill_file: RefCountedTempFile,
+        num_rows: usize,
+        schema: SchemaRef,
+        batch_count: usize,
+    ) -> Result<()> {
+        let spilled_rows = spill_manager.metrics.spilled_rows.value();
+        assert_eq!(spilled_rows, num_rows);
+
+        let stream = spill_manager.read_spill_as_stream(spill_file)?;
+        assert_eq!(stream.schema(), schema);
+
+        let batches = collect(stream).await?;
+        assert_eq!(batches.len(), batch_count);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_spill_compression() -> Result<()> {
+        let batch = build_compressible_batch();
+        let num_rows = batch.num_rows();
+        let schema = batch.schema();
+        let batch_count = 1;
+        let batches = [batch];
+
+        // Construct SpillManager
+        let env = Arc::new(RuntimeEnv::default());
+        let uncompressed_metrics = 
SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let lz4_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 
0);
+        let zstd_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 
0);
+        let uncompressed_spill_manager = SpillManager::new(
+            Arc::clone(&env),
+            uncompressed_metrics,
+            Arc::clone(&schema),
+        );
+        let lz4_spill_manager =
+            SpillManager::new(Arc::clone(&env), lz4_metrics, 
Arc::clone(&schema))
+                .with_compression_type(SpillCompression::Lz4Frame);
+        let zstd_spill_manager =
+            SpillManager::new(env, zstd_metrics, Arc::clone(&schema))
+                .with_compression_type(SpillCompression::Zstd);
+        let uncompressed_spill_file = uncompressed_spill_manager
+            .spill_record_batch_and_finish(&batches, "Test")?
+            .unwrap();
+        let lz4_spill_file = lz4_spill_manager
+            .spill_record_batch_and_finish(&batches, "Lz4_Test")?
+            .unwrap();
+        let zstd_spill_file = zstd_spill_manager
+            .spill_record_batch_and_finish(&batches, "ZSTD_Test")?
+            .unwrap();
+        assert!(uncompressed_spill_file.path().exists());
+        assert!(lz4_spill_file.path().exists());
+        assert!(zstd_spill_file.path().exists());
+
+        let lz4_spill_size = std::fs::metadata(lz4_spill_file.path())?.len();
+        let zstd_spill_size = std::fs::metadata(zstd_spill_file.path())?.len();
+        let uncompressed_spill_size =
+            std::fs::metadata(uncompressed_spill_file.path())?.len();
+
+        assert!(uncompressed_spill_size > lz4_spill_size);
+        assert!(uncompressed_spill_size > zstd_spill_size);
+
+        validate(
+            &lz4_spill_manager,
+            lz4_spill_file,
+            num_rows,
+            Arc::clone(&schema),
+            batch_count,
+        )
+        .await?;
+        validate(
+            &zstd_spill_manager,
+            zstd_spill_file,
+            num_rows,
+            Arc::clone(&schema),
+            batch_count,
+        )
+        .await?;
+        validate(
+            &uncompressed_spill_manager,
+            uncompressed_spill_file,
+            num_rows,
+            schema,
+            batch_count,
+        )
+        .await?;
+        Ok(())
+    }
+
     #[test]
     fn test_get_record_batch_memory_size() {
         // Create a simple record batch with two columns
diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs 
b/datafusion/physical-plan/src/spill/spill_manager.rs
index 78cd47a8ba..89fd12d398 100644
--- a/datafusion/physical-plan/src/spill/spill_manager.rs
+++ b/datafusion/physical-plan/src/spill/spill_manager.rs
@@ -23,7 +23,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_execution::runtime_env::RuntimeEnv;
 
-use datafusion_common::Result;
+use datafusion_common::{config::SpillCompression, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::SendableRecordBatchStream;
 
@@ -44,7 +44,8 @@ pub struct SpillManager {
     schema: SchemaRef,
     /// Number of batches to buffer in memory during disk reads
     batch_read_buffer_capacity: usize,
-    // TODO: Add general-purpose compression options
+    /// general-purpose compression options
+    pub(crate) compression: SpillCompression,
 }
 
 impl SpillManager {
@@ -54,9 +55,15 @@ impl SpillManager {
             metrics,
             schema,
             batch_read_buffer_capacity: 2,
+            compression: SpillCompression::default(),
         }
     }
 
+    pub fn with_compression_type(mut self, spill_compression: 
SpillCompression) -> Self {
+        self.compression = spill_compression;
+        self
+    }
+
     /// Creates a temporary file for in-progress operations, returning an error
     /// message if file creation fails. The file can be used to append batches
     /// incrementally and then finish the file when done.
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index bf9ae20e7b..bc9336dcc6 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -263,6 +263,7 @@ datafusion.execution.skip_physical_aggregate_schema_check 
false
 datafusion.execution.soft_max_rows_per_output_file 50000000
 datafusion.execution.sort_in_place_threshold_bytes 1048576
 datafusion.execution.sort_spill_reservation_bytes 10485760
+datafusion.execution.spill_compression uncompressed
 datafusion.execution.split_file_groups_by_statistics false
 datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
@@ -375,6 +376,7 @@ datafusion.execution.skip_physical_aggregate_schema_check 
false When set to true
 datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of 
rows in output files when writing multiple. This is a soft max, so it can be 
exceeded slightly. There also will be one file smaller than the limit if the 
total number of rows written is not roughly divisible by the soft max
 datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below 
what size should data be concatenated and sorted in a single RecordBatch rather 
than sorted in batches and merged.
 datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the 
reserved memory for each spillable sort operation to facilitate an in-memory 
merge. When a sort operation spills to disk, the in-memory data must be sorted 
and merged before being written to a file. This setting reserves a specific 
amount of memory for that in-memory sort/merge process. Note: This setting is 
irrelevant if the sort operation cannot spill (i.e., if there's no 
`DiskManager` configured).
+datafusion.execution.spill_compression uncompressed Sets the compression codec 
used when spilling data to disk. Since datafusion writes spill files using the 
Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer 
are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame 
offers faster (de)compression, but typically results in larger spill files. In 
contrast, zstd achieves higher compression ratios at the cost of slower 
(de)compression speed.
 datafusion.execution.split_file_groups_by_statistics false Attempt to 
eliminate sorts by packing & sorting files with non-overlapping statistics into 
the same file groups. Currently experimental
 datafusion.execution.target_partitions 7 Number of partitions for query 
execution. Increasing partitions can increase concurrency. Defaults to the 
number of CPU cores on the system
 datafusion.execution.time_zone +00:00 The default time zone Some functions, 
e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to 
this time zone, and then extract the hour
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index b55e63293f..23a35c896d 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -83,6 +83,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2  
                       | (writing) By default parallel parquet writer is tuned 
for minimum memory usage in a streaming execution plan. You may see a 
performance benefit when writing large parquet files by increasing 
maximum_parallel_row_group_writers and 
maximum_buffered_record_batches_per_stream if your system has idle cores and 
can tolerate additional memory usage. Boosting these values is likely 
worthwhile  [...]
 | datafusion.execution.planning_concurrency                               | 0  
                       | Fan-out during initial physical planning. This is 
mostly use to plan `UNION` children in parallel. Defaults to the number of CPU 
cores on the system                                                             
                                                                                
                                                                                
                    [...]
 | datafusion.execution.skip_physical_aggregate_schema_check               | 
false                     | When set to true, skips verifying that the schema 
produced by planning the input of `LogicalPlan::Aggregate` exactly matches the 
schema of the input plan. When set to false, if the schema does not match 
exactly (including nullability and metadata), a planning error will be raised. 
This is used to workaround bugs in the planner that are now caught by the new 
schema verification step.    [...]
+| datafusion.execution.spill_compression                                  | 
uncompressed              | Sets the compression codec used when spilling data 
to disk. Since datafusion writes spill files using the Arrow IPC Stream format, 
only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values 
are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster 
(de)compression, but typically results in larger spill files. In contrast, zstd 
achieves higher compression rati [...]
 | datafusion.execution.sort_spill_reservation_bytes                       | 
10485760                  | Specifies the reserved memory for each spillable 
sort operation to facilitate an in-memory merge. When a sort operation spills 
to disk, the in-memory data must be sorted and merged before being written to a 
file. This setting reserves a specific amount of memory for that in-memory 
sort/merge process. Note: This setting is irrelevant if the sort operation 
cannot spill (i.e., if there's  [...]
 | datafusion.execution.sort_in_place_threshold_bytes                      | 
1048576                   | When sorting, below what size should data be 
concatenated and sorted in a single RecordBatch rather than sorted in batches 
and merged.                                                                     
                                                                                
                                                                                
                          [...]
 | datafusion.execution.meta_fetch_concurrency                             | 32 
                       | Number of files to read in parallel when inferring 
schema and statistics                                                           
                                                                                
                                                                                
                                                                                
                  [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to