potiuk commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports URL: https://github.com/apache/airflow/pull/6596#discussion_r353156308
########## File path: airflow/executors/dask_executor.py ########## @@ -16,36 +15,37 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Dask executor.""" import subprocess -import warnings +from typing import Any, Dict, Optional -import distributed +from distributed import Client, Future, as_completed +from distributed.security import Security from airflow.configuration import conf -from airflow.executors.base_executor import BaseExecutor +from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType +from airflow.models.taskinstance import TaskInstanceKeyType class DaskExecutor(BaseExecutor): """ DaskExecutor submits tasks to a Dask Distributed cluster. """ def __init__(self, cluster_address=None): + super().__init__(parallelism=0) if cluster_address is None: cluster_address = conf.get('dask', 'cluster_address') - if not cluster_address: - raise ValueError( - 'Please provide a Dask cluster address in airflow.cfg') + assert cluster_address, 'Please provide a Dask cluster address in airflow.cfg' self.cluster_address = cluster_address # ssl / tls parameters self.tls_ca = conf.get('dask', 'tls_ca') self.tls_key = conf.get('dask', 'tls_key') self.tls_cert = conf.get('dask', 'tls_cert') - super().__init__(parallelism=0) + self.client: Optional[Client] = None + self.futures: Optional[Dict[Optional[Future], TaskInstanceKeyType]] = None Review comment: True. Let's see what mypy thinks about it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services