This is an automated email from the ASF dual-hosted git repository.

yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 47718d86 [feature] support shuffle read with retry when facing IO 
error. (#738)
47718d86 is described below

commit 47718d8624c40e32a559a6078d262b68a96012ea
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Apr 21 11:13:59 2023 +0800

    [feature] support shuffle read with retry when facing IO error. (#738)
    
    * [feature] support shuffle read with retry when facing IO error.
---
 ballista/core/src/client.rs | 91 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 66 insertions(+), 25 deletions(-)

diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d4179b32..4382dd05 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -43,9 +43,9 @@ use crate::serde::protobuf;
 use crate::utils::create_grpc_client_connection;
 use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
-use log::debug;
+use log::{debug, warn};
 use prost::Message;
-use tonic::Streaming;
+use tonic::{Code, Streaming};
 
 /// Client for interacting with Ballista executors.
 #[derive(Clone)]
@@ -53,6 +53,10 @@ pub struct BallistaClient {
     flight_client: FlightServiceClient<tonic::transport::channel::Channel>,
 }
 
+//TODO make this configurable
+const IO_RETRIES_TIMES: u8 = 3;
+const IO_RETRY_WAIT_TIME_MS: u64 = 3000;
+
 impl BallistaClient {
     /// Create a new BallistaClient to connect to the executor listening on 
the specified
     /// host and port
@@ -117,32 +121,69 @@ impl BallistaClient {
             .encode(&mut buf)
             .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?;
 
-        let request = tonic::Request::new(Ticket { ticket: buf.into() });
-
-        let mut stream = self
-            .flight_client
-            .do_get(request)
-            .await
-            .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
-            .into_inner();
+        for i in 0..IO_RETRIES_TIMES {
+            if i > 0 {
+                warn!(
+                    "Remote shuffle read fail, retry {} times, sleep {} ms.",
+                    i, IO_RETRY_WAIT_TIME_MS
+                );
+                tokio::time::sleep(std::time::Duration::from_millis(
+                    IO_RETRY_WAIT_TIME_MS,
+                ))
+                .await;
+            }
 
-        // the schema should be the first message returned, else client should 
error
-        match stream
-            .message()
-            .await
-            .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
-        {
-            Some(flight_data) => {
-                // convert FlightData to a stream
-                let schema = Arc::new(Schema::try_from(&flight_data)?);
-
-                // all the remaining stream messages should be dictionary and 
record batches
-                Ok(Box::pin(FlightDataStream::new(stream, schema)))
+            let request = tonic::Request::new(Ticket {
+                ticket: buf.clone().into(),
+            });
+            let result = self.flight_client.do_get(request).await;
+
+            let res = match result {
+                Ok(res) => res,
+                Err(ref err) => {
+                    // IO related error like connection timeout, reset... will 
warp with Code::Unknown
+                    // This means IO related error will retry.
+                    if i == IO_RETRIES_TIMES - 1 || err.code() != 
Code::Unknown {
+                        return BallistaError::GrpcActionError(format!(
+                            "{:?}",
+                            result.unwrap_err()
+                        ))
+                        .into();
+                    }
+                    // retry request
+                    continue;
+                }
+            };
+
+            let mut stream = res.into_inner();
+            match stream.message().await {
+                Ok(res) => {
+                    return match res {
+                        Some(flight_data) => {
+                            // convert FlightData to a stream
+                            let schema = 
Arc::new(Schema::try_from(&flight_data)?);
+
+                            // all the remaining stream messages should be 
dictionary and record batches
+                            Ok(Box::pin(FlightDataStream::new(stream, schema)))
+                        }
+                        None => Err(BallistaError::GrpcActionError(
+                            "Did not receive schema batch from flight 
server".to_string(),
+                        )),
+                    };
+                }
+                Err(e) => {
+                    if i == IO_RETRIES_TIMES - 1 || e.code() != Code::Unknown {
+                        return BallistaError::GrpcActionError(format!(
+                            "{:?}",
+                            e.to_string()
+                        ))
+                        .into();
+                    }
+                    continue;
+                }
             }
-            None => Err(BallistaError::GrpcActionError(
-                "Did not receive schema batch from flight server".to_string(),
-            )),
         }
+        unreachable!("Did not receive schema batch from flight server");
     }
 }
 

Reply via email to