This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4_compression
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 92e9a3c93cb0a4364d81492e5edcc867029a5c79
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Mon Nov 27 18:45:36 2023 +0100

    Use lz4 compression for shuffle files and streams
---
 ballista/core/src/utils.rs              | 9 ++++++++-
 ballista/executor/src/flight_service.rs | 5 ++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 3252ad72..e16c1b4c 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats;
 
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
 use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
 use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
 use datafusion::error::DataFusionError;
@@ -82,7 +84,12 @@ pub async fn write_stream_to_disk(
     let mut num_rows = 0;
     let mut num_batches = 0;
     let mut num_bytes = 0;
-    let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
+
+    let options = IpcWriteOptions::default()
+        .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+    let mut writer =
+        FileWriter::try_new_with_options(file, stream.schema().as_ref(), 
options)?;
 
     while let Some(result) = stream.next().await {
         let batch = result?;
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index 4a62bc5f..7237ea0f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
+use arrow::ipc::CompressionType;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -231,7 +232,9 @@ async fn stream_flight_data<T>(
 where
     T: Read + Seek,
 {
-    let options = arrow::ipc::writer::IpcWriteOptions::default();
+    let options = arrow::ipc::writer::IpcWriteOptions::default()
+        .try_with_compression(Some(CompressionType::LZ4_FRAME))
+        .map_err(|x| from_arrow_err(&x))?;
     let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), 
&options).into();
     send_response(&tx, Ok(schema_flight_data)).await?;
 

Reply via email to