milenkovicm commented on code in PR #1478:
URL: 
https://github.com/apache/datafusion-ballista/pull/1478#discussion_r2872229296


##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -453,6 +485,160 @@ async fn execute_query(
         };
     }
 }
+/// After job is scheduled client waits
+/// for job updates, which are streamed back
+/// from server to client
+#[allow(clippy::too_many_arguments)]
+async fn execute_query_push(
+    scheduler_url: String,
+    query: ExecuteQueryParams,
+    max_message_size: usize,
+    grpc_config: GrpcClientConfig,
+    metrics: Arc<ExecutionPlanMetricsSet>,
+    partition: usize,
+    session_config: SessionConfig,
+) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+    let grpc_interceptor = session_config.ballista_grpc_interceptor();
+    let customize_endpoint =
+        session_config.ballista_override_create_grpc_client_endpoint();
+    let use_tls = session_config.ballista_use_tls();
+
+    // Capture query submission time for total_query_time_ms
+    let query_start_time = std::time::Instant::now();
+
+    info!("Connecting to Ballista scheduler at {scheduler_url}");
+    // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
+    let mut endpoint =
+        create_grpc_client_endpoint(scheduler_url.clone(), Some(&grpc_config))
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+    if let Some(ref customize) = customize_endpoint {
+        endpoint = customize
+            .configure_endpoint(endpoint)
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+    }
+
+    let connection = endpoint
+        .connect()
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+    let mut scheduler = SchedulerGrpcClient::with_interceptor(
+        connection,
+        grpc_interceptor.as_ref().clone(),
+    )
+    .max_encoding_message_size(max_message_size)
+    .max_decoding_message_size(max_message_size);
+
+    let mut query_status_stream = scheduler
+        .execute_query_push(query)
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
+        .into_inner();
+
+    let mut prev_status: Option<job_status::Status> = None;
+
+    loop {
+        let item = query_status_stream
+            .next()

Review Comment:
   we do not know for how long job is going to run, hard to limit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to