This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 794dd95e Use local shuffle reader in containerized environments and 
some improvements. (#399)
794dd95e is described below

commit 794dd95e0357dc6ace5ea315a88948e9d8f2b2f3
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Oct 21 08:51:41 2022 +0800

    Use local shuffle reader in containerized environments and some 
improvements. (#399)
---
 .../core/src/execution_plans/shuffle_reader.rs     | 111 +++++++++++++--------
 1 file changed, 72 insertions(+), 39 deletions(-)

diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 514554cc..298b3021 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -18,6 +18,7 @@
 use async_trait::async_trait;
 use std::any::Any;
 use std::collections::HashMap;
+use std::fmt::Debug;
 use std::fs::File;
 use std::pin::Pin;
 use std::result;
@@ -46,7 +47,7 @@ use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::common::AbortOnDropMany;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use itertools::Itertools;
-use log::{debug, error, info};
+use log::{error, info};
 use rand::prelude::SliceRandom;
 use rand::thread_rng;
 use tokio::sync::{mpsc, Semaphore};
@@ -274,44 +275,50 @@ fn send_fetch_partitions(
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
     let mut join_handles = vec![];
-    for p in partition_locations.into_iter() {
-        if check_is_local_location(&p.executor_meta.host) {
-            // local shuffle reader should not be restrict
-            debug!("Get local partition file from {}", &p.executor_meta.host);
-            let response_sender = response_sender.clone();
-            let join_handle = tokio::spawn(async move {
-                let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
-                if let Err(e) = response_sender.send(r).await {
-                    error!("Fail to send response event to the channel due to 
{}", e);
-                }
-            });
-            join_handles.push(join_handle);
-        } else {
-            debug!("Get remote partition file from {}", &p.executor_meta.host);
-            let semaphore = semaphore.clone();
-            let response_sender = response_sender.clone();
-            let join_handle = tokio::spawn(async move {
-                // Block if exceeds max request number
-                let permit = semaphore.acquire_owned().await.unwrap();
-                let r = 
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
-                // Block if the channel buffer is ful
-                if let Err(e) = response_sender.send(r).await {
-                    error!("Fail to send response event to the channel due to 
{}", e);
-                }
-                // Increase semaphore by dropping existing permits.
-                drop(permit);
-            });
-            join_handles.push(join_handle);
+    let (local_locations, remote_locations): (Vec<_>, Vec<_>) = 
partition_locations
+        .into_iter()
+        .partition(check_is_local_location);
+
+    info!(
+        "local shuffle file counts:{}, remote shuffle file count:{}.",
+        local_locations.len(),
+        remote_locations.len()
+    );
+
+    // keep local shuffle files reading in serial order for memory control.
+    let response_sender_c = response_sender.clone();
+    let join_handle = tokio::spawn(async move {
+        for p in local_locations {
+            let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+            if let Err(e) = response_sender_c.send(r).await {
+                error!("Fail to send response event to the channel due to {}", 
e);
+            }
         }
+    });
+    join_handles.push(join_handle);
+
+    for p in remote_locations.into_iter() {
+        let semaphore = semaphore.clone();
+        let response_sender = response_sender.clone();
+        let join_handle = tokio::spawn(async move {
+            // Block if exceeds max request number
+            let permit = semaphore.acquire_owned().await.unwrap();
+            let r = 
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+            // Block if the channel buffer is ful
+            if let Err(e) = response_sender.send(r).await {
+                error!("Fail to send response event to the channel due to {}", 
e);
+            }
+            // Increase semaphore by dropping existing permits.
+            drop(permit);
+        });
+        join_handles.push(join_handle);
     }
 
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
-fn check_is_local_location(host: &str) -> bool {
-    host == "localhost"
-        || host == "127.0.0.1"
-        || host == sys_info::hostname().unwrap_or_else(|_| 
"localhost".to_string())
+fn check_is_local_location(location: &PartitionLocation) -> bool {
+    std::path::Path::new(location.path.as_str()).exists()
 }
 
 /// Partition reader Trait, different partition reader can have
@@ -334,13 +341,14 @@ enum PartitionReaderEnum {
 
 #[async_trait]
 impl PartitionReader for PartitionReaderEnum {
+    // Notice return `BallistaError::FetchFailed` will let scheduler 
re-schedule the task.
     async fn fetch_partition(
         &self,
         location: &PartitionLocation,
     ) -> result::Result<SendableRecordBatchStream, BallistaError> {
         match self {
             PartitionReaderEnum::FlightRemote => 
fetch_partition_remote(location).await,
-            PartitionReaderEnum::Local => 
fetch_partition_local(&location.path).await,
+            PartitionReaderEnum::Local => 
fetch_partition_local(location).await,
             PartitionReaderEnum::ObjectStoreRemote => {
                 fetch_partition_object_store(location).await
             }
@@ -377,17 +385,39 @@ async fn fetch_partition_remote(
 }
 
 async fn fetch_partition_local(
-    path: &str,
+    location: &PartitionLocation,
 ) -> result::Result<SendableRecordBatchStream, BallistaError> {
-    debug!("FetchPartition local reading {}", &path);
+    let path = &location.path;
+    let metadata = &location.executor_meta;
+    let partition_id = &location.partition_id;
+
+    let reader = fetch_partition_local_inner(path).map_err(|e| {
+        // return BallistaError::FetchFailed may let scheduler retry this task.
+        BallistaError::FetchFailed(
+            metadata.id.clone(),
+            partition_id.stage_id,
+            partition_id.partition_id,
+            e.to_string(),
+        )
+    })?;
+    Ok(Box::pin(LocalShuffleStream::new(reader)))
+}
+
+fn fetch_partition_local_inner(
+    path: &str,
+) -> result::Result<FileReader<File>, BallistaError> {
     let file = File::open(&path).map_err(|e| {
         BallistaError::General(format!(
             "Failed to open partition file at {}: {:?}",
             path, e
         ))
     })?;
-    let reader = FileReader::try_new(file, 
None).map_err(BallistaError::ArrowError)?;
-    Ok(Box::pin(LocalShuffleStream::new(reader)))
+    FileReader::try_new(file, None).map_err(|e| {
+        BallistaError::General(format!(
+            "Failed to new arrow FileReader at {}: {:?}",
+            path, e
+        ))
+    })
 }
 
 async fn fetch_partition_object_store(
@@ -569,7 +599,10 @@ mod tests {
 
         // from to input partitions test the first one with two batches
         let file_path = path.value(0);
-        let mut stream = fetch_partition_local(file_path).await.unwrap();
+        let reader = fetch_partition_local_inner(file_path).unwrap();
+
+        let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
+            async { Box::pin(LocalShuffleStream::new(reader)) }.await;
 
         let result = utils::collect_stream(&mut stream)
             .await

Reply via email to