danielhumanmod commented on code in PR #1478:
URL:
https://github.com/apache/datafusion-ballista/pull/1478#discussion_r2868226865
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -453,6 +485,160 @@ async fn execute_query(
};
}
}
+/// After job is scheduled client waits
+/// for job updates, which are streamed back
+/// from server to client
+#[allow(clippy::too_many_arguments)]
+async fn execute_query_push(
+ scheduler_url: String,
+ query: ExecuteQueryParams,
+ max_message_size: usize,
+ grpc_config: GrpcClientConfig,
+ metrics: Arc<ExecutionPlanMetricsSet>,
+ partition: usize,
+ session_config: SessionConfig,
+) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+ let grpc_interceptor = session_config.ballista_grpc_interceptor();
+ let customize_endpoint =
+ session_config.ballista_override_create_grpc_client_endpoint();
+ let use_tls = session_config.ballista_use_tls();
+
+ // Capture query submission time for total_query_time_ms
+ let query_start_time = std::time::Instant::now();
+
+ info!("Connecting to Ballista scheduler at {scheduler_url}");
+ // TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
+ let mut endpoint =
+ create_grpc_client_endpoint(scheduler_url.clone(), Some(&grpc_config))
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+ if let Some(ref customize) = customize_endpoint {
+ endpoint = customize
+ .configure_endpoint(endpoint)
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+ }
+
+ let connection = endpoint
+ .connect()
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+ let mut scheduler = SchedulerGrpcClient::with_interceptor(
+ connection,
+ grpc_interceptor.as_ref().clone(),
+ )
+ .max_encoding_message_size(max_message_size)
+ .max_decoding_message_size(max_message_size);
+
+ let mut query_status_stream = scheduler
+ .execute_query_push(query)
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
+ .into_inner();
+
+ let mut prev_status: Option<job_status::Status> = None;
+
+ loop {
+ let item = query_status_stream
+ .next()
+ .await
+ .ok_or(DataFusionError::Execution(
+ "Stream closed without job completing".to_string(),
Review Comment:
Based on my understanding, the current user experience is that users receive
an error when the stream disconnects.
For better UX, one option we could consider adding a fallback to polling if
an error occurs? (totally optional, maybe in a follow-up PR)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]