This is an automated email from the ASF dual-hosted git repository.
yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 47718d86 [feature] support shuffle read with retry when facing IO
error. (#738)
47718d86 is described below
commit 47718d8624c40e32a559a6078d262b68a96012ea
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Apr 21 11:13:59 2023 +0800
[feature] support shuffle read with retry when facing IO error. (#738)
* [feature] support shuffle read with retry when facing IO error.
---
ballista/core/src/client.rs | 91 ++++++++++++++++++++++++++++++++-------------
1 file changed, 66 insertions(+), 25 deletions(-)
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d4179b32..4382dd05 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -43,9 +43,9 @@ use crate::serde::protobuf;
use crate::utils::create_grpc_client_connection;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
-use log::debug;
+use log::{debug, warn};
use prost::Message;
-use tonic::Streaming;
+use tonic::{Code, Streaming};
/// Client for interacting with Ballista executors.
#[derive(Clone)]
@@ -53,6 +53,10 @@ pub struct BallistaClient {
flight_client: FlightServiceClient<tonic::transport::channel::Channel>,
}
+//TODO make this configurable
+const IO_RETRIES_TIMES: u8 = 3;
+const IO_RETRY_WAIT_TIME_MS: u64 = 3000;
+
impl BallistaClient {
/// Create a new BallistaClient to connect to the executor listening on
the specified
/// host and port
@@ -117,32 +121,69 @@ impl BallistaClient {
.encode(&mut buf)
.map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?;
- let request = tonic::Request::new(Ticket { ticket: buf.into() });
-
- let mut stream = self
- .flight_client
- .do_get(request)
- .await
- .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
- .into_inner();
+ for i in 0..IO_RETRIES_TIMES {
+ if i > 0 {
+ warn!(
+ "Remote shuffle read fail, retry {} times, sleep {} ms.",
+ i, IO_RETRY_WAIT_TIME_MS
+ );
+ tokio::time::sleep(std::time::Duration::from_millis(
+ IO_RETRY_WAIT_TIME_MS,
+ ))
+ .await;
+ }
- // the schema should be the first message returned, else client should
error
- match stream
- .message()
- .await
- .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
- {
- Some(flight_data) => {
- // convert FlightData to a stream
- let schema = Arc::new(Schema::try_from(&flight_data)?);
-
- // all the remaining stream messages should be dictionary and
record batches
- Ok(Box::pin(FlightDataStream::new(stream, schema)))
+ let request = tonic::Request::new(Ticket {
+ ticket: buf.clone().into(),
+ });
+ let result = self.flight_client.do_get(request).await;
+
+ let res = match result {
+ Ok(res) => res,
+ Err(ref err) => {
+ // IO related error like connection timeout, reset... will
warp with Code::Unknown
+ // This means IO related error will retry.
+ if i == IO_RETRIES_TIMES - 1 || err.code() !=
Code::Unknown {
+ return BallistaError::GrpcActionError(format!(
+ "{:?}",
+ result.unwrap_err()
+ ))
+ .into();
+ }
+ // retry request
+ continue;
+ }
+ };
+
+ let mut stream = res.into_inner();
+ match stream.message().await {
+ Ok(res) => {
+ return match res {
+ Some(flight_data) => {
+ // convert FlightData to a stream
+ let schema =
Arc::new(Schema::try_from(&flight_data)?);
+
+ // all the remaining stream messages should be
dictionary and record batches
+ Ok(Box::pin(FlightDataStream::new(stream, schema)))
+ }
+ None => Err(BallistaError::GrpcActionError(
+ "Did not receive schema batch from flight
server".to_string(),
+ )),
+ };
+ }
+ Err(e) => {
+ if i == IO_RETRIES_TIMES - 1 || e.code() != Code::Unknown {
+ return BallistaError::GrpcActionError(format!(
+ "{:?}",
+ e.to_string()
+ ))
+ .into();
+ }
+ continue;
+ }
}
- None => Err(BallistaError::GrpcActionError(
- "Did not receive schema batch from flight server".to_string(),
- )),
}
+ unreachable!("Did not receive schema batch from flight server");
}
}