fjetter commented on a change in pull request #16829:
URL: https://github.com/apache/airflow/pull/16829#discussion_r673811608



##########
File path: airflow/executors/dask_executor.py
##########
@@ -81,7 +81,22 @@ def airflow_run():
         if not self.client:
             raise AirflowException(NOT_STARTED_MESSAGE)
 
-        future = self.client.submit(airflow_run, pure=False)
+        resources = None
+        if queue is not None:
+            avail_resources = self.client.run_on_scheduler(
+                lambda dask_scheduler: dask_scheduler.resources
+            )

Review comment:
       > scheduler.get_resources()
   
   The scheduler will not have the method but the plugin has. By registering a 
handler you are registering an RPC with the scheduler. 
   The `client.scheduler` is, in fact, not the scheduler but a 
[PooledRPCCall](https://github.com/dask/distributed/blob/9975baa54b5d2079891e93f57ed8e0f402ef89d8/distributed/core.py#L840)
 which means that any attributes on that object are translated to an RPC to the 
scheduler. Therefore `client.scheduler.handler_name(handler_argument)` will 
execute the method registered under handler `handler_name` with argument 
`handler_argumnet`. In this specific case it should read
   
   ```
   client.scheduler.get_ressources() # There are no arguments here 
   ```
   
   In fact, you might need to wrap this with the sync method since technically 
the RPC call will be async. Try
   
   ```python
   client.sync(client.scheduler.get_resources())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to