This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 226a2b8  Queue support for DaskExecutor using Dask Worker Resources 
(#16829)
226a2b8 is described below

commit 226a2b8c33d28cd391717191efb4593951d1f90c
Author: aa1371 <[email protected]>
AuthorDate: Tue Aug 31 23:32:59 2021 -1000

    Queue support for DaskExecutor using Dask Worker Resources (#16829)
---
 airflow/executors/dask_executor.py    | 17 ++++++++-
 docs/apache-airflow/executor/dask.rst |  6 ++-
 tests/executors/test_dask_executor.py | 72 +++++++++++++++++++++++++++++++++--
 3 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/airflow/executors/dask_executor.py 
b/airflow/executors/dask_executor.py
index f26ad0e..5a3eca8 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -33,6 +33,10 @@ from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, 
CommandType
 from airflow.models.taskinstance import TaskInstanceKey
 
+# queue="default" is a special case since this is the base config default 
queue name,
+# with respect to DaskExecutor, treat it as if no queue is provided
+_UNDEFINED_QUEUES = {None, 'default'}
+
 
 class DaskExecutor(BaseExecutor):
     """DaskExecutor submits tasks to a Dask Distributed cluster."""
@@ -81,7 +85,18 @@ class DaskExecutor(BaseExecutor):
         if not self.client:
             raise AirflowException(NOT_STARTED_MESSAGE)
 
-        future = self.client.submit(airflow_run, pure=False)
+        resources = None
+        if queue not in _UNDEFINED_QUEUES:
+            scheduler_info = self.client.scheduler_info()
+            avail_queues = {
+                resource for d in scheduler_info['workers'].values() for 
resource in d['resources']
+            }
+
+            if queue not in avail_queues:
+                raise AirflowException(f"Attempted to submit task to an 
unavailable queue: '{queue}'")
+            resources = {queue: 1}
+
+        future = self.client.submit(airflow_run, pure=False, 
resources=resources)
         self.futures[future] = key  # type: ignore
 
     def _process_future(self, future: Future) -> None:
diff --git a/docs/apache-airflow/executor/dask.rst 
b/docs/apache-airflow/executor/dask.rst
index 6991797..0d0a680 100644
--- a/docs/apache-airflow/executor/dask.rst
+++ b/docs/apache-airflow/executor/dask.rst
@@ -50,5 +50,7 @@ Please note:
 
 - Each Dask worker must be able to import Airflow and any dependencies you
   require.
-- Dask does not support queues. If an Airflow task was created with a queue, a
-  warning will be raised but the task will be submitted to the cluster.
+- The DaskExecutor implements queues using
+  `Dask Worker Resources 
<https://distributed.dask.org/en/latest/resources.html>`_ functionality. To 
enable the use of
+  queues, start your Dask workers with resources of the same name as the 
desired queues and a limit of ``inf``.
+  E.g. ``dask-worker <scheduler_address> --resources="QUEUE1=inf,QUEUE2=inf"``.
diff --git a/tests/executors/test_dask_executor.py 
b/tests/executors/test_dask_executor.py
index 5239f41..668034f 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -21,6 +21,7 @@ from unittest import mock
 
 import pytest
 
+from airflow.exceptions import AirflowException
 from airflow.jobs.backfill_job import BackfillJob
 from airflow.models import DagBag
 from airflow.utils import timezone
@@ -40,18 +41,18 @@ except ImportError:
     skip_tls_tests = True
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+SUCCESS_COMMAND = ['airflow', 'tasks', 'run', '--help']
+FAIL_COMMAND = ['airflow', 'tasks', 'run', 'false']
 
 
 class TestBaseDask(unittest.TestCase):
     def assert_tasks_on_executor(self, executor):
 
-        success_command = ['airflow', 'tasks', 'run', '--help']
-        fail_command = ['airflow', 'tasks', 'run', 'false']
         # start the executor
         executor.start()
 
-        executor.execute_async(key='success', command=success_command)
-        executor.execute_async(key='fail', command=fail_command)
+        executor.execute_async(key='success', command=SUCCESS_COMMAND)
+        executor.execute_async(key='fail', command=FAIL_COMMAND)
 
         success_future = next(k for k, v in executor.futures.items() if v == 
'success')
         fail_future = next(k for k, v in executor.futures.items() if v == 
'fail')
@@ -145,3 +146,66 @@ class TestDaskExecutorTLS(TestBaseDask):
             mock.call('executor.running_tasks', mock.ANY),
         ]
         mock_stats_gauge.assert_has_calls(calls)
+
+
+class TestDaskExecutorQueue(unittest.TestCase):
+    def test_dask_queues_no_resources(self):
+        self.cluster = LocalCluster()
+        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+        executor.start()
+
+        with self.assertRaises(AirflowException):
+            executor.execute_async(key='success', command=SUCCESS_COMMAND, 
queue='queue1')
+
+    def test_dask_queues_not_available(self):
+        self.cluster = LocalCluster(resources={'queue1': 1})
+        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+        executor.start()
+
+        with self.assertRaises(AirflowException):
+            # resource 'queue2' doesn't exist on cluster
+            executor.execute_async(key='success', command=SUCCESS_COMMAND, 
queue='queue2')
+
+    def test_dask_queues(self):
+        self.cluster = LocalCluster(resources={'queue1': 1})
+        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+        executor.start()
+
+        executor.execute_async(key='success', command=SUCCESS_COMMAND, 
queue='queue1')
+        success_future = next(k for k, v in executor.futures.items() if v == 
'success')
+
+        # wait for the futures to execute, with a timeout
+        timeout = timezone.utcnow() + timedelta(seconds=30)
+        while not success_future.done():
+            if timezone.utcnow() > timeout:
+                raise ValueError(
+                    'The futures should have finished; there is probably '
+                    'an error communicating with the Dask cluster.'
+                )
+
+        assert success_future.done()
+        assert success_future.exception() is None
+
+    def test_dask_queues_no_queue_specified(self):
+        self.cluster = LocalCluster(resources={'queue1': 1})
+        executor = DaskExecutor(cluster_address=self.cluster.scheduler_address)
+        executor.start()
+
+        # no queue specified for executing task
+        executor.execute_async(key='success', command=SUCCESS_COMMAND)
+        success_future = next(k for k, v in executor.futures.items() if v == 
'success')
+
+        # wait for the futures to execute, with a timeout
+        timeout = timezone.utcnow() + timedelta(seconds=30)
+        while not success_future.done():
+            if timezone.utcnow() > timeout:
+                raise ValueError(
+                    'The futures should have finished; there is probably '
+                    'an error communicating with the Dask cluster.'
+                )
+
+        assert success_future.done()
+        assert success_future.exception() is None
+
+    def tearDown(self):
+        self.cluster.close(timeout=5)

Reply via email to