danielhumanmod commented on code in PR #1536:
URL:
https://github.com/apache/datafusion-ballista/pull/1536#discussion_r3020027771
##########
ballista/core/src/execution_plans/distributed_query.rs:
##########
@@ -588,204 +579,16 @@ async fn execute_query_push(
partition_location,
..
})) => {
- // Calculate job execution time (server-side execution)
- let job_execution_ms = ended_at.saturating_sub(started_at);
- let duration = Duration::from_millis(job_execution_ms);
-
- info!("Job {job_id} finished executing in {duration:?} ");
-
- // Calculate scheduling time (server-side queue time)
- // This includes network latency and actual queue time
- let scheduling_ms = started_at.saturating_sub(queued_at);
-
- // Calculate total query time (end-to-end from client
perspective)
- let total_elapsed = query_start_time.elapsed();
- let total_ms = total_elapsed.as_millis();
-
- // Set timing metrics
- let metric_job_execution = MetricBuilder::new(&metrics)
- .gauge("job_execution_time_ms", partition);
- metric_job_execution.set(job_execution_ms as usize);
-
- let metric_scheduling =
- MetricBuilder::new(&metrics).gauge("job_scheduling_in_ms",
partition);
- metric_scheduling.set(scheduling_ms as usize);
-
- let metric_total_time =
- MetricBuilder::new(&metrics).gauge("total_query_time_ms",
partition);
- metric_total_time.set(total_ms as usize);
-
- // Note: data_transfer_time_ms is not set here because
partition fetching
- // happens lazily when the stream is consumed, not during
execute_query.
- // This could be added in a future enhancement by wrapping the
stream.
-
- let streams = partition_location.into_iter().map(move
|partition| {
- let f = fetch_partition(
- partition,
- max_message_size,
- true,
- scheduler_url.clone(),
- flight_proxy.clone(),
- customize_endpoint.clone(),
- use_tls,
- )
- .map_err(|e| ArrowError::ExternalError(Box::new(e)));
-
- futures::stream::once(f).try_flatten()
+ break Ok(CompletedJob {
+ job_id,
+ partition_location,
+ flight_proxy,
+ queued_at,
+ started_at,
+ ended_at,
+ query_start_time,
});
-
- break Ok(futures::stream::iter(streams).flatten());
}
};
}
}
-
-fn get_client_host_port(
Review Comment:
migrate to the fetch result handler too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]