Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test c6a09c47e -> 7cff6cde4
[AIRFLOW-1308] Disable nanny usage for Dask Nanny is deprecated and results in build errors. Closes #2366 from bolkedebruin/fix_dask (cherry picked from commit 10826711846f06476d343b150412105489096179) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7cff6cde Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7cff6cde Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7cff6cde Branch: refs/heads/v1-8-test Commit: 7cff6cde4b50eb96595bb28e05b2fce99752abbf Parents: c6a09c4 Author: Bolke de Bruin <[email protected]> Authored: Thu Jun 15 09:44:16 2017 -0400 Committer: Bolke de Bruin <[email protected]> Committed: Thu Jun 15 09:46:36 2017 -0400 ---------------------------------------------------------------------- tests/executors/dask_executor.py | 114 ++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7cff6cde/tests/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py new file mode 100644 index 0000000..f66a272 --- /dev/null +++ b/tests/executors/dask_executor.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging +import time +import unittest + +from airflow import configuration +from airflow.models import DAG, DagBag, TaskInstance, State +from airflow.jobs import BackfillJob +from airflow.operators.python_operator import PythonOperator + +try: + from airflow.executors.dask_executor import DaskExecutor + from distributed import LocalCluster + SKIP_DASK = False +except ImportError: + logging.error('Dask unavailable, skipping DaskExecutor tests') + SKIP_DASK = True + +if 'sqlite' in configuration.get('core', 'sql_alchemy_conn'): + logging.error('sqlite does not support concurrent access') + SKIP_DASK = True + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) + + +class DaskExecutorTest(unittest.TestCase): + + def setUp(self): + self.dagbag = DagBag(include_examples=True) + + @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') + def test_dask_executor_functions(self): + cluster = LocalCluster() + + executor = DaskExecutor(cluster_address=cluster.scheduler_address) + + # start the executor + executor.start() + + success_command = 'echo 1' + fail_command = 'exit 1' + + executor.execute_async(key='success', command=success_command) + executor.execute_async(key='fail', command=fail_command) + + success_future = next( + k for k, v in executor.futures.items() if v == 'success') + fail_future = next( + k for k, v in executor.futures.items() if v == 'fail') + + # wait for the futures to execute, with a timeout + timeout = datetime.datetime.now() + datetime.timedelta(seconds=30) + while not (success_future.done() and fail_future.done()): + if datetime.datetime.now() > timeout: + raise ValueError( + 'The futures should have finished; there is probably ' + 'an error communciating with the Dask cluster.') + + # both tasks should have finished + self.assertTrue(success_future.done()) + self.assertTrue(fail_future.done()) + + # check task exceptions + self.assertTrue(success_future.exception() is None) + self.assertTrue(fail_future.exception() is not None) + + cluster.close() + + + @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration') + def test_backfill_integration(self): + """ + Test that DaskExecutor can be used to backfill example dags + """ + cluster = LocalCluster() + + dags = [ + dag for dag in self.dagbag.dags.values() + if dag.dag_id in [ + 'example_bash_operator', + # 'example_python_operator', + ] + ] + + for dag in dags: + dag.clear( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE) + + for i, dag in enumerate(sorted(dags, key=lambda d: d.dag_id)): + job = BackfillJob( + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + ignore_first_depends_on_past=True, + executor=DaskExecutor( + cluster_address=cluster.scheduler_address)) + job.run() + + cluster.close()
