This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 794dd95e Use local shuffle reader in containerized environments and
some improvements. (#399)
794dd95e is described below
commit 794dd95e0357dc6ace5ea315a88948e9d8f2b2f3
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Oct 21 08:51:41 2022 +0800
Use local shuffle reader in containerized environments and some
improvements. (#399)
---
.../core/src/execution_plans/shuffle_reader.rs | 111 +++++++++++++--------
1 file changed, 72 insertions(+), 39 deletions(-)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 514554cc..298b3021 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -18,6 +18,7 @@
use async_trait::async_trait;
use std::any::Any;
use std::collections::HashMap;
+use std::fmt::Debug;
use std::fs::File;
use std::pin::Pin;
use std::result;
@@ -46,7 +47,7 @@ use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::common::AbortOnDropMany;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
-use log::{debug, error, info};
+use log::{error, info};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::sync::{mpsc, Semaphore};
@@ -274,44 +275,50 @@ fn send_fetch_partitions(
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
let semaphore = Arc::new(Semaphore::new(max_request_num));
let mut join_handles = vec![];
- for p in partition_locations.into_iter() {
- if check_is_local_location(&p.executor_meta.host) {
- // local shuffle reader should not be restrict
- debug!("Get local partition file from {}", &p.executor_meta.host);
- let response_sender = response_sender.clone();
- let join_handle = tokio::spawn(async move {
- let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
- if let Err(e) = response_sender.send(r).await {
- error!("Fail to send response event to the channel due to
{}", e);
- }
- });
- join_handles.push(join_handle);
- } else {
- debug!("Get remote partition file from {}", &p.executor_meta.host);
- let semaphore = semaphore.clone();
- let response_sender = response_sender.clone();
- let join_handle = tokio::spawn(async move {
- // Block if exceeds max request number
- let permit = semaphore.acquire_owned().await.unwrap();
- let r =
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
- // Block if the channel buffer is ful
- if let Err(e) = response_sender.send(r).await {
- error!("Fail to send response event to the channel due to
{}", e);
- }
- // Increase semaphore by dropping existing permits.
- drop(permit);
- });
- join_handles.push(join_handle);
+ let (local_locations, remote_locations): (Vec<_>, Vec<_>) =
partition_locations
+ .into_iter()
+ .partition(check_is_local_location);
+
+ info!(
+ "local shuffle file counts:{}, remote shuffle file count:{}.",
+ local_locations.len(),
+ remote_locations.len()
+ );
+
+ // keep local shuffle files reading in serial order for memory control.
+ let response_sender_c = response_sender.clone();
+ let join_handle = tokio::spawn(async move {
+ for p in local_locations {
+ let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+ if let Err(e) = response_sender_c.send(r).await {
+ error!("Fail to send response event to the channel due to {}",
e);
+ }
}
+ });
+ join_handles.push(join_handle);
+
+ for p in remote_locations.into_iter() {
+ let semaphore = semaphore.clone();
+ let response_sender = response_sender.clone();
+ let join_handle = tokio::spawn(async move {
+ // Block if exceeds max request number
+ let permit = semaphore.acquire_owned().await.unwrap();
+ let r =
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+ // Block if the channel buffer is ful
+ if let Err(e) = response_sender.send(r).await {
+ error!("Fail to send response event to the channel due to {}",
e);
+ }
+ // Increase semaphore by dropping existing permits.
+ drop(permit);
+ });
+ join_handles.push(join_handle);
}
AbortableReceiverStream::create(response_receiver, join_handles)
}
-fn check_is_local_location(host: &str) -> bool {
- host == "localhost"
- || host == "127.0.0.1"
- || host == sys_info::hostname().unwrap_or_else(|_|
"localhost".to_string())
+fn check_is_local_location(location: &PartitionLocation) -> bool {
+ std::path::Path::new(location.path.as_str()).exists()
}
/// Partition reader Trait, different partition reader can have
@@ -334,13 +341,14 @@ enum PartitionReaderEnum {
#[async_trait]
impl PartitionReader for PartitionReaderEnum {
+ // Notice return `BallistaError::FetchFailed` will let scheduler
re-schedule the task.
async fn fetch_partition(
&self,
location: &PartitionLocation,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
match self {
PartitionReaderEnum::FlightRemote =>
fetch_partition_remote(location).await,
- PartitionReaderEnum::Local =>
fetch_partition_local(&location.path).await,
+ PartitionReaderEnum::Local =>
fetch_partition_local(location).await,
PartitionReaderEnum::ObjectStoreRemote => {
fetch_partition_object_store(location).await
}
@@ -377,17 +385,39 @@ async fn fetch_partition_remote(
}
async fn fetch_partition_local(
- path: &str,
+ location: &PartitionLocation,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
- debug!("FetchPartition local reading {}", &path);
+ let path = &location.path;
+ let metadata = &location.executor_meta;
+ let partition_id = &location.partition_id;
+
+ let reader = fetch_partition_local_inner(path).map_err(|e| {
+ // return BallistaError::FetchFailed may let scheduler retry this task.
+ BallistaError::FetchFailed(
+ metadata.id.clone(),
+ partition_id.stage_id,
+ partition_id.partition_id,
+ e.to_string(),
+ )
+ })?;
+ Ok(Box::pin(LocalShuffleStream::new(reader)))
+}
+
+fn fetch_partition_local_inner(
+ path: &str,
+) -> result::Result<FileReader<File>, BallistaError> {
let file = File::open(&path).map_err(|e| {
BallistaError::General(format!(
"Failed to open partition file at {}: {:?}",
path, e
))
})?;
- let reader = FileReader::try_new(file,
None).map_err(BallistaError::ArrowError)?;
- Ok(Box::pin(LocalShuffleStream::new(reader)))
+ FileReader::try_new(file, None).map_err(|e| {
+ BallistaError::General(format!(
+ "Failed to new arrow FileReader at {}: {:?}",
+ path, e
+ ))
+ })
}
async fn fetch_partition_object_store(
@@ -569,7 +599,10 @@ mod tests {
// from to input partitions test the first one with two batches
let file_path = path.value(0);
- let mut stream = fetch_partition_local(file_path).await.unwrap();
+ let reader = fetch_partition_local_inner(file_path).unwrap();
+
+ let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
+ async { Box::pin(LocalShuffleStream::new(reader)) }.await;
let result = utils::collect_stream(&mut stream)
.await