Ted-Jiang commented on code in PR #738:
URL: https://github.com/apache/arrow-ballista/pull/738#discussion_r1161557758
##########
ballista/core/src/client.rs:
##########
@@ -117,32 +120,69 @@ impl BallistaClient {
.encode(&mut buf)
.map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?;
- let request = tonic::Request::new(Ticket { ticket: buf.into() });
-
- let mut stream = self
- .flight_client
- .do_get(request)
- .await
- .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
- .into_inner();
+ for i in 0..IO_RETRIES_TIMES {
+ if i > 0 {
+ warn!(
+ "Remote shuffle read fail, retry {} times, sleep {} ms.",
+ i, IO_RETRY_WAIT_TIME_MS
+ );
+ tokio::time::sleep(std::time::Duration::from_millis(
+ IO_RETRY_WAIT_TIME_MS,
+ ))
+ .await;
+ }
- // the schema should be the first message returned, else client should
error
- match stream
- .message()
- .await
- .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?
- {
- Some(flight_data) => {
- // convert FlightData to a stream
- let schema = Arc::new(Schema::try_from(&flight_data)?);
-
- // all the remaining stream messages should be dictionary and
record batches
- Ok(Box::pin(FlightDataStream::new(stream, schema)))
+ let request = tonic::Request::new(Ticket {
+ ticket: buf.clone().into(),
+ });
+ let result = self.flight_client.do_get(request).await;
+
+ let res = match result {
+ Ok(res) => res,
+ Err(ref err) => {
+ // IO related error like connection timeout, reset... will
warp with Code::Unknown
Review Comment:
ConnectionReset need retry use `Unknown` type
```
(Execution("FetchFailed(\"127.0.0.1\",
\"37258e22-f50e-482b-9c0d-d1e84b83d5e5\", 3, 2, \"Status { code: Unknown,
message: \\\"transport error\\\", source:
Some(tonic::transport::Error(Transport, hyper::Error(Io,
Kind(ConnectionReset)))) }\")"))
```
NotSuchFile use `Internal`
```
DataFusion error: ArrowError(ExternalError("Arrow error: External error:
Shuffle fetch partition error from Executor 127.0.0. :
ff16a886-ca14-4a1b-bdc3-560c6adf699c, map_stage 2, map_partition 18, error
desc: Status { code: Internal, message: \"Ballista Error: General(\\\"Failed to
open partition file at /home
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]