potiuk commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r352741740
########## File path: airflow/executors/dask_executor.py ########## @@ -55,23 +58,28 @@ def start(self): else: security = None - self.client = distributed.Client(self.cluster_address, security=security) + self.client = Client(self.cluster_address, security=security) self.futures = {} - def execute_async(self, key, command, queue=None, executor_config=None): - if queue is not None: - warnings.warn( - 'DaskExecutor does not support queues. ' - 'All tasks will be run in the same cluster' - ) + def execute_async(self, + key: TaskInstanceKeyType, + command: CommandType, + queue: Optional[str] = None, + executor_config: Optional[Any] = None) -> None: + if not self.futures: + raise AirflowException("Executor should be started first.") def airflow_run(): return subprocess.check_call(command, close_fds=True) + if not self.client: + raise AirflowException("The Dask executor has not been started yet!") future = self.client.submit(airflow_run, pure=False) self.futures[future] = key - def _process_future(self, future): + def _process_future(self, future: Future) -> None: + if not self.futures: Review comment: Ah yeah. Good catch. I replaced it in in all cases where I used the NOT_STARTED message but I missed a few places where I left the original message. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services