martin-g commented on code in PR #1478:
URL: 
https://github.com/apache/datafusion-ballista/pull/1478#discussion_r2871998412


##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -351,83 +362,107 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 .and_then(|s| s.value.clone())
                 .unwrap_or_default();
 
-            let job_id = self.state.task_manager.generate_job_id();
+            info!(
+                "execution query (PUSH) job received - session_id: 
{session_id}, operation_id: {operation_id}, job_name: {job_name}"
+            );
+
+            let (_session_id, session_ctx) = self
+                .create_context(&settings, session_id.clone())
+                .await
+                .map_err(|e| {
+                    Status::internal(format!("Failed to create SessionContext: 
{e:?}"))
+                })?;
+
+            let plan = self.parse_plan(query, &session_ctx).await.map_err(|e| {
+                let msg = format!("Could not parse plan: {e}");
+                error!("{}", msg);
+
+                Status::invalid_argument(msg)
+            })?;
+
+            debug!(
+                "Decoded logical plan for execution:\n{}",
+                plan.display_indent()
+            );
+            log::trace!("setting job name: {job_name}");
+
+            let flight_proxy = self.flight_proxy_config();
+
+            let (subscriber, rx) = tokio::sync::mpsc::channel::<JobStatus>(16);
+            let stream = ReceiverStream::new(rx).map(move |status| {
+                Ok::<_, tonic::Status>(GetJobStatusResult {
+                    status: Some(status),
+                    flight_proxy: flight_proxy.clone(),
+                })
+            });
+
+            log::trace!("setting job name: {job_name}");

Review Comment:
   This trace duplicates the earlier one at line 387



##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -453,6 +485,160 @@ async fn execute_query(
         };
     }
 }
+/// After job is scheduled client waits
+/// for job updates, which are streamed back
+/// from server to client
+#[allow(clippy::too_many_arguments)]
+async fn execute_query_push(
+    scheduler_url: String,
+    query: ExecuteQueryParams,
+    max_message_size: usize,
+    grpc_config: GrpcClientConfig,
+    metrics: Arc<ExecutionPlanMetricsSet>,
+    partition: usize,
+    session_config: SessionConfig,
+) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
+    let grpc_interceptor = session_config.ballista_grpc_interceptor();
+    let customize_endpoint =
+        session_config.ballista_override_create_grpc_client_endpoint();
+    let use_tls = session_config.ballista_use_tls();
+
+    // Capture query submission time for total_query_time_ms
+    let query_start_time = std::time::Instant::now();
+
+    info!("Connecting to Ballista scheduler at {scheduler_url}");
+    // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
+    let mut endpoint =
+        create_grpc_client_endpoint(scheduler_url.clone(), Some(&grpc_config))
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+    if let Some(ref customize) = customize_endpoint {
+        endpoint = customize
+            .configure_endpoint(endpoint)
+            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+    }
+
+    let connection = endpoint
+        .connect()
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
+
+    let mut scheduler = SchedulerGrpcClient::with_interceptor(
+        connection,
+        grpc_interceptor.as_ref().clone(),
+    )
+    .max_encoding_message_size(max_message_size)
+    .max_decoding_message_size(max_message_size);
+
+    let mut query_status_stream = scheduler
+        .execute_query_push(query)
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
+        .into_inner();
+
+    let mut prev_status: Option<job_status::Status> = None;
+
+    loop {
+        let item = query_status_stream
+            .next()

Review Comment:
   Should this wait for a new message for a particular time? I.e. does it need 
a  timeout ?



##########
ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs:
##########
@@ -108,10 +110,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 let state = self.state.clone();
                 tokio::spawn(async move {
                     let event = if let Err(e) = state
-                        .submit_job(&job_id, &job_name, session_ctx, &plan, 
queued_at)
+                        .submit_job(
+                            &job_id,
+                            &job_name,
+                            session_ctx,
+                            &plan,
+                            queued_at,
+                            subscriber.clone(),
+                        )
                         .await
                     {
+                        let error = e.to_string();
                         let fail_message = format!("Error planning job 
{job_id}: {e:?}");
+
+                        // this is a corner case, as most of job status 
changes are handled in
+                        // job state, after job is submitted to job state
+                        if let Some(subscriber) = subscriber {
+                            let job_status = JobStatus {
+                                job_id: job_id.clone(),
+                                job_name,
+                                status: 
Some(ballista_core::serde::protobuf::job_status::Status::Failed(
+                                    FailedJob { error, queued_at, started_at: 
timestamp_millis(), ended_at: timestamp_millis() }

Review Comment:
   nit: Extract `timestamp_millis()` into a local variable and reuse it for 
both `started_at` and `ended_at`.



##########
ballista/scheduler/src/cluster/memory.rs:
##########
@@ -380,12 +380,34 @@ impl InMemoryJobState {
     }
 }
 
+#[derive(Clone)]
+struct ExtendedJobStatus {
+    status: JobStatus,
+    subscriber: Option<JobStatusSubscriber>,
+}
+
+impl ExtendedJobStatus {
+    async fn update_subscribers(&self, status: JobStatus) {
+        if let Some(subscriber) = &self.subscriber {
+            let _ = subscriber.send(status).await;

Review Comment:
   Consider spawning a new task for the send because this may block the sender 
if the receiver is slow for some reason.



##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -247,33 +247,62 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
 
         let session_config = context.session_config().clone();
 
-        let stream = futures::stream::once(
-            execute_query(
-                self.scheduler_url.clone(),
-                self.session_id.clone(),
-                query,
-                self.config.default_grpc_client_max_message_size(),
-                GrpcClientConfig::from(&self.config),
-                Arc::new(self.metrics.clone()),
-                partition,
-                session_config,
+        if session_config.ballista_config().client_pull() {
+            let stream = futures::stream::once(
+                execute_query_pull(
+                    self.scheduler_url.clone(),
+                    self.session_id.clone(),
+                    query,
+                    self.config.default_grpc_client_max_message_size(),
+                    GrpcClientConfig::from(&self.config),
+                    Arc::new(self.metrics.clone()),
+                    partition,
+                    session_config,
+                )
+                .map_err(|e| ArrowError::ExternalError(Box::new(e))),
             )
-            .map_err(|e| ArrowError::ExternalError(Box::new(e))),
-        )
-        .try_flatten()
-        .inspect(move |batch| {
-            metric_total_bytes.add(
-                batch
-                    .as_ref()
-                    .map(|b| b.get_array_memory_size())
-                    .unwrap_or(0),
-            );
-
-            metric_row_count.add(batch.as_ref().map(|b| 
b.num_rows()).unwrap_or(0));
-        });
-
-        let schema = self.schema();
-        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+            .try_flatten()
+            .inspect(move |batch| {
+                metric_total_bytes.add(
+                    batch
+                        .as_ref()
+                        .map(|b| b.get_array_memory_size())
+                        .unwrap_or(0),
+                );
+
+                metric_row_count.add(batch.as_ref().map(|b| 
b.num_rows()).unwrap_or(0));
+            });
+
+            let schema = self.schema();
+            Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+        } else {

Review Comment:
   The bodies of `if` and `else` look very similar. Could the logic be shared ?



##########
ballista/core/src/config.rs:
##########
@@ -156,7 +158,11 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
         ConfigEntry::new(BALLISTA_SHUFFLE_SORT_BASED_BATCH_SIZE.to_string(),
                          "Target batch size in rows for coalescing small 
batches in sort shuffle".to_string(),
                          DataType::UInt64,
-                         Some((8192).to_string()))
+                         Some((8192).to_string())),
+        ConfigEntry::new(BALLISTA_CLIENT_PULL.to_string(),
+                         "Should client employ pull or push job tracking. In 
pull mode client will make a request to server in the loop, until job finishes. 
Pull mode is keep for legacy clients.".to_string(),

Review Comment:
   ```suggestion
                            "Should client employ pull or push job tracking. In 
pull mode client will make a request to server in the loop, until job finishes. 
Pull mode is kept for legacy clients.".to_string(),
   ```



##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -351,83 +362,107 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 .and_then(|s| s.value.clone())
                 .unwrap_or_default();
 
-            let job_id = self.state.task_manager.generate_job_id();
+            info!(
+                "execution query (PUSH) job received - session_id: 
{session_id}, operation_id: {operation_id}, job_name: {job_name}"
+            );
+
+            let (_session_id, session_ctx) = self

Review Comment:
   AFAIS the session_id is used later in an info!() call.
   So, here you can remove the `.clone()` at line 370 and _move_ the earlier 
variable, and remove the leading `_` for the returned variable that will be 
used for the logging/tracing.



##########
ballista/scheduler/src/cluster/memory.rs:
##########
@@ -450,12 +472,29 @@ impl JobState for InMemoryJobState {
             status.status,
             Some(Status::Successful(_)) | Some(Status::Failed(_))
         ) {
+            if let Some((_, job_info)) = self.running_jobs.remove(job_id) {
+                job_info.update_subscribers(status.clone()).await;
+            }
+
             self.completed_jobs
                 .insert(job_id.to_string(), (status.clone(), 
Some(graph.cloned())));
-            self.running_jobs.remove(job_id);
         } else {
             // otherwise update running job
-            self.running_jobs.insert(job_id.to_string(), status.clone());
+            let subscriber = if let Some(mut job_info) = 
self.running_jobs.get_mut(job_id)
+            {
+                job_info.status = status.clone();
+                // we're cloning subscriber not to await in lock
+                job_info.subscriber.clone()
+            } else {
+                Err(BallistaError::Internal(format!(
+                    "scheduler state can't find job: {}",
+                    job_id
+                )))?
+            };
+
+            if let Some(subscriber) = subscriber {
+                let _ = subscriber.send(status.clone()).await;

Review Comment:
   Same here: consider spawning a new task for the send because this may block 
the sender if the receiver is slow for some reason. Or use `try_send()` if 
stopping early would be better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to