This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch use_lz4
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 9c88bc4115260ff55cda6484fae357496fe6fb7b
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Thu Dec 22 09:56:27 2022 +0100

    Use lz4 for shuffle compression
---
 ballista/executor/src/flight_service.rs | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index d12686ed..be762b1d 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,6 +21,7 @@ use std::convert::TryFrom;
 use std::fs::File;
 use std::pin::Pin;
 
+use arrow::ipc::CompressionType;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -228,7 +229,10 @@ async fn stream_flight_data<T>(
 where
     T: Read + Seek,
 {
-    let options = arrow::ipc::writer::IpcWriteOptions::default();
+    let options = arrow::ipc::writer::IpcWriteOptions::default()
+        .try_with_compression(Some(CompressionType::LZ4_FRAME)).map_err(
+            |err| Status::internal(format!("Couldn't create writer: {}", 
err.to_string()))
+    )?;
     let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), 
&options).into();
     send_response(&tx, Ok(schema_flight_data)).await?;
 

Reply via email to