martin-g commented on code in PR #1893:
URL: 
https://github.com/apache/datafusion-ballista/pull/1893#discussion_r3480286097


##########
ballista/scheduler/src/config.rs:
##########
@@ -34,6 +34,16 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use std::fmt::Display;
 use std::sync::Arc;
 
+/// Callback invoked when new work becomes available for executors.
+///
+/// This is called after:
+/// - A job is submitted and tasks are ready to be scheduled
+/// - Tasks complete and new stages become runnable
+///
+/// This allows external systems to notify executors to poll immediately
+/// rather than waiting for their next poll interval.

Review Comment:
   ```suggestion
   /// rather than waiting for their next poll interval.
   ///
   /// # Warning
   ///
   /// This callback is executed synchronously within the scheduler's main 
event loop.
   /// Implementations **must be non-blocking** and should offload any blocking 
or
   /// long-running operations (such as network I/O) to a separate task or 
thread.
   ```
   
   The callback is synchronous and a blocking implementation may affect badly 
the Scheduler;w work.



##########
ballista/scheduler/src/config.rs:
##########
@@ -546,6 +560,7 @@ impl TryFrom<Config> for SchedulerConfig {
             cors_allowed_origins: opt.cors_allowed_origins,
             #[cfg(feature = "rest-api")]
             cors_allowed_methods: opt.cors_allowed_methods,
+            on_work_available: None,

Review Comment:
   How is this new setting supposed to be set ?
   I'd expect at least a new `with_on_work_available` setter to be added if 
there is no new command line option.



##########
ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs:
##########
@@ -295,6 +300,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                                 .await?;
                         }
 
+                        // Notify external systems when new stages become 
runnable
+                        if !stage_events.is_empty()
+                            && let Some(ref callback) = 
self.config.on_work_available
+                        {
+                            callback("tasks_completed:new_stages_runnable");
+                        }
+

Review Comment:
   The `stage_events` are not necessarily "new stages are runnable". They might 
be JobRunningFailed or JobFinished, neither of which represents new runnable 
work.



##########
ballista/executor/src/execution_loop.rs:
##########
@@ -204,7 +208,20 @@ where
         }
 
         if !active_job {
-            tokio::time::sleep(Duration::from_millis(50)).await;
+            // Wait for either the poll interval or a poll_now notification
+            match &poll_now_notify {
+                Some(notify) => {
+                    tokio::select! {
+                        () = tokio::time::sleep(Duration::from_millis(50)) => 
{}

Review Comment:
   The duration here could be longer.
   The idea is that `notify` will notify us as soon as there is some work, 
right ?
   So, there is no need to poll so frequently if an external mechanism will 
notify us when to do it.



##########
ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs:
##########
@@ -295,6 +300,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                                 .await?;
                         }
 
+                        // Notify external systems when new stages become 
runnable
+                        if !stage_events.is_empty()
+                            && let Some(ref callback) = 
self.config.on_work_available
+                        {
+                            callback("tasks_completed:new_stages_runnable");
+                        }
+

Review Comment:
   Shouldn't this be done after line 312 ? I.e. after posting the events.
   Currently this will tell executors to poll but it seems to early.



##########
ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs:
##########
@@ -180,6 +180,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         .post_event(QueryStageSchedulerEvent::ReviveOffers)
                         .await?;
                 }
+
+                // Notify external systems that new work is available
+                if let Some(ref callback) = self.config.on_work_available {
+                    callback(&format!("job_submitted:{job_id}"));

Review Comment:
   The `event_sender.post_event(QueryStageSchedulerEvent::ReviveOffers)` is 
non-blocking and the events might be still in the channel when this callback is 
executed. I.e. the first poll may see no events.



##########
ballista/scheduler/src/config.rs:
##########
@@ -34,6 +34,16 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use std::fmt::Display;
 use std::sync::Arc;
 
+/// Callback invoked when new work becomes available for executors.
+///
+/// This is called after:
+/// - A job is submitted and tasks are ready to be scheduled
+/// - Tasks complete and new stages become runnable
+///
+/// This allows external systems to notify executors to poll immediately
+/// rather than waiting for their next poll interval.
+pub type OnWorkAvailableFn = Arc<dyn Fn(&str) + Send + Sync>;

Review Comment:
   Is there a need to use an Arc ?
   It could be a Box.



##########
ballista/scheduler/src/config.rs:
##########
@@ -34,6 +34,16 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use std::fmt::Display;
 use std::sync::Arc;
 
+/// Callback invoked when new work becomes available for executors.
+///
+/// This is called after:
+/// - A job is submitted and tasks are ready to be scheduled
+/// - Tasks complete and new stages become runnable
+///
+/// This allows external systems to notify executors to poll immediately
+/// rather than waiting for their next poll interval.
+pub type OnWorkAvailableFn = Arc<dyn Fn(&str) + Send + Sync>;

Review Comment:
   The function receives a plain `&str` as a command/message. IMO it would be 
better to use an Enum is all possible variants of commands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to