This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 13456466 Use lz4 compression for shuffle files & flight stream,
refactoring / improvements (#920)
13456466 is described below
commit 134564666c172d923c278390bde97a792a6fee52
Author: Daniƫl Heres <[email protected]>
AuthorDate: Thu Nov 30 18:02:17 2023 +0000
Use lz4 compression for shuffle files & flight stream, refactoring /
improvements (#920)
* Use lz4 compression for shuffle files and streams
* Add feature
* Backport improvements
* More compression
---
Cargo.toml | 2 +-
.../core/src/execution_plans/shuffle_writer.rs | 9 +-
ballista/core/src/utils.rs | 9 +-
ballista/executor/src/flight_service.rs | 117 ++++++++-------------
4 files changed, 61 insertions(+), 76 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index e4b5f324..fcdebab6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,7 +29,7 @@ members = [
resolver = "2"
[workspace.dependencies]
-arrow = { version = "48.0.0" }
+arrow = { version = "48.0.0", features=["ipc_compression"] }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "48.0.0", default-features = false }
configure_me = { version = "0.4.0" }
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 1896c206..2540a1d2 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -20,6 +20,8 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format.
Future stages of the query
//! will use the ShuffleReaderExec to read these results.
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use std::any::Any;
@@ -242,9 +244,14 @@ impl ShuffleWriterExec {
));
debug!("Writing results to {:?}",
path);
- let mut writer = IPCWriter::new(
+ let options =
IpcWriteOptions::default()
+ .try_with_compression(Some(
+ CompressionType::LZ4_FRAME,
+ ))?;
+ let mut writer =
IPCWriter::new_with_options(
&path,
stream.schema().as_ref(),
+ options,
)?;
writer.write(&output_batch)?;
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 3252ad72..e16c1b4c 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -25,6 +25,8 @@ use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::writer::IpcWriteOptions;
+use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::error::DataFusionError;
@@ -82,7 +84,12 @@ pub async fn write_stream_to_disk(
let mut num_rows = 0;
let mut num_batches = 0;
let mut num_bytes = 0;
- let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
+
+ let options = IpcWriteOptions::default()
+ .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+
+ let mut writer =
+ FileWriter::try_new_with_options(file, stream.schema().as_ref(),
options)?;
while let Some(result) = stream.next().await {
let batch = result?;
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index 4a62bc5f..ed8ffb35 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -21,12 +21,14 @@ use std::convert::TryFrom;
use std::fs::File;
use std::pin::Pin;
-use arrow_flight::SchemaAsIpc;
+use arrow::ipc::CompressionType;
+use arrow_flight::encode::FlightDataEncoderBuilder;
+use arrow_flight::error::FlightError;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::Action as BallistaAction;
-use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
+use arrow::ipc::writer::IpcWriteOptions;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse,
@@ -35,20 +37,16 @@ use arrow_flight::{
use datafusion::arrow::{
error::ArrowError, ipc::reader::FileReader, record_batch::RecordBatch,
};
-use futures::{Stream, StreamExt};
-use log::{debug, info, warn};
+use futures::{Stream, StreamExt, TryStreamExt};
+use log::{debug, info};
use std::io::{Read, Seek};
use tokio::sync::mpsc::channel;
-use tokio::{
- sync::mpsc::{Receiver, Sender},
- task,
-};
+use tokio::sync::mpsc::error::SendError;
+use tokio::{sync::mpsc::Sender, task};
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status, Streaming};
-
-type FlightDataSender = Sender<Result<FlightData, Status>>;
-type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
+use tracing::warn;
/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
@@ -67,7 +65,7 @@ impl Default for BallistaFlightService {
}
type BoxedFlightStream<T> =
- Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;
+ Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>;
#[tonic::async_trait]
impl FlightService for BallistaFlightService {
@@ -101,19 +99,25 @@ impl FlightService for BallistaFlightService {
let reader =
FileReader::try_new(file, None).map_err(|e|
from_arrow_err(&e))?;
- let (tx, rx): (FlightDataSender, FlightDataReceiver) =
channel(2);
-
- let file_path = path.to_owned();
- // Arrow IPC reader does not implement Sync + Send so we need
to use a channel
- // to communicate
- task::spawn(async move {
- if let Err(e) = stream_flight_data(file_path, reader,
tx).await {
- warn!("Error streaming results: {:?}", e);
+ let (tx, rx) = channel(2);
+ let schema = reader.schema();
+ task::spawn_blocking(move || {
+ if let Err(e) = read_partition(reader, tx) {
+ warn!(error = %e, "error streaming shuffle partition");
}
});
+ let write_options: IpcWriteOptions = IpcWriteOptions::default()
+ .try_with_compression(Some(CompressionType::LZ4_FRAME))
+ .map_err(|e| from_arrow_err(&e))?;
+ let flight_data_stream = FlightDataEncoderBuilder::new()
+ .with_schema(schema)
+ .with_options(write_options)
+ .build(ReceiverStream::new(rx))
+ .map_err(|err| Status::from_error(Box::new(err)));
+
Ok(Response::new(
- Box::pin(ReceiverStream::new(rx)) as Self::DoGetStream
+ Box::pin(flight_data_stream) as Self::DoGetStream
))
}
}
@@ -148,7 +152,7 @@ impl FlightService for BallistaFlightService {
let output = futures::stream::iter(vec![result]);
let str = format!("Bearer {token}");
let mut resp: Response<
- Pin<Box<dyn Stream<Item = Result<_, Status>> + Sync + Send>>,
+ Pin<Box<dyn Stream<Item = Result<_, Status>> + Send + 'static>>,
> = Response::new(Box::pin(output));
let md = MetadataValue::try_from(str)
.map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
@@ -202,67 +206,34 @@ impl FlightService for BallistaFlightService {
}
}
-/// Convert a single RecordBatch into an iterator of FlightData (containing
-/// dictionaries and batches)
-fn create_flight_iter(
- batch: &RecordBatch,
- options: &IpcWriteOptions,
-) -> Box<dyn Iterator<Item = Result<FlightData, Status>>> {
- let data_gen = IpcDataGenerator::default();
- let mut dictionary_tracker = DictionaryTracker::new(false);
- let res = data_gen.encoded_batch(batch, &mut dictionary_tracker, options);
- match res {
- Ok((dicts, batch)) => {
- let flights = dicts
- .into_iter()
- .chain(std::iter::once(batch))
- .map(|x| x.into());
- Box::new(flights.map(Ok))
- }
- Err(e) => Box::new(std::iter::once(Err(from_arrow_err(&e)))),
- }
-}
-
-async fn stream_flight_data<T>(
- file_path: String,
+fn read_partition<T>(
reader: FileReader<T>,
- tx: FlightDataSender,
-) -> Result<(), Status>
+ tx: Sender<Result<RecordBatch, FlightError>>,
+) -> Result<(), FlightError>
where
T: Read + Seek,
{
- let options = arrow::ipc::writer::IpcWriteOptions::default();
- let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(),
&options).into();
- send_response(&tx, Ok(schema_flight_data)).await?;
+ if tx.is_closed() {
+ return Err(FlightError::Tonic(Status::internal(
+ "Can't send a batch, channel is closed",
+ )));
+ }
- let mut row_count = 0;
for batch in reader {
- if let Ok(x) = &batch {
- row_count += x.num_rows();
- }
- let batch_flight_data: Vec<_> = batch
- .map(|b| create_flight_iter(&b, &options).collect())
- .map_err(|e| from_arrow_err(&e))?;
- for batch in batch_flight_data.into_iter() {
- send_response(&tx, batch).await?;
- }
+ tx.blocking_send(batch.map_err(|err| err.into()))
+ .map_err(|err| {
+ if let SendError(Err(err)) = err {
+ err
+ } else {
+ FlightError::Tonic(Status::internal(
+ "Can't send a batch, something went wrong",
+ ))
+ }
+ })?
}
- debug!(
- "FetchPartition streamed {} rows for file {}",
- row_count, file_path
- );
Ok(())
}
-async fn send_response(
- tx: &FlightDataSender,
- data: Result<FlightData, Status>,
-) -> Result<(), Status> {
- tx.send(data)
- .await
- .map_err(|e| Status::internal(format!("{e:?}")))
-}
-
fn from_arrow_err(e: &ArrowError) -> Status {
Status::internal(format!("ArrowError: {e:?}"))
}