This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 179e93093b7b21fa60cf8d5984ec4fd0c6b52730 Author: yuqian90 <[email protected]> AuthorDate: Fri Feb 21 19:35:55 2020 +0800 [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276) If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right behaviour. This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override. (cherry picked from commit 1cdab56a6192f69962506b7ff632c986c84eb10d) --- UPDATING.md | 7 ++ airflow/models/baseoperator.py | 2 + airflow/models/skipmixin.py | 112 ++++++++++++----- airflow/ti_deps/dep_context.py | 27 ++++- airflow/ti_deps/deps/not_previously_skipped_dep.py | 88 ++++++++++++++ requirements/requirements-python2.7.txt | 67 ++++++----- requirements/requirements-python3.8.txt | 92 +++++++------- requirements/setup-2.7.md5 | 2 +- requirements/setup-3.8.md5 | 2 +- tests/jobs/test_scheduler_job.py | 39 ++++++ tests/operators/test_latest_only_operator.py | 60 ++++++---- tests/operators/test_python_operator.py | 119 +++++++++++++++++- .../deps/test_not_previously_skipped_dep.py | 133 +++++++++++++++++++++ tests/ti_deps/deps/test_trigger_rule_dep.py | 40 +++++++ 14 files changed, 655 insertions(+), 135 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 61734bb..f82ba10 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -25,6 +25,7 @@ assists users migrating to a new version. <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE --> **Table of contents** +- [Airflow 1.10.12](#airflow-11012) - [Airflow 1.10.11](#airflow-11011) - [Airflow 1.10.10](#airflow-11010) - [Airflow 1.10.9](#airflow-1109) @@ -59,6 +60,12 @@ More tips can be found in the guide: https://developers.google.com/style/inclusive-documentation --> +## Airflow 1.10.12 + +### Clearing tasks skipped by SkipMixin will skip them + +Previously, when tasks skipped by SkipMixin (such as BranchPythonOperator, BaseBranchOperator and ShortCircuitOperator) are cleared, they execute. Since 1.10.12, when such skipped tasks are cleared, +they will be skipped again by the newly introduced NotPreviouslySkippedDep. ## Airflow 1.10.11 diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 52037c5..266ad64 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -45,6 +45,7 @@ from airflow.models.pool import Pool from airflow.models.taskinstance import TaskInstance, clear_task_instances from airflow.models.xcom import XCOM_RETURN_KEY from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep +from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep from airflow.utils import timezone @@ -575,6 +576,7 @@ class BaseOperator(LoggingMixin): NotInRetryPeriodDep(), PrevDagrunDep(), TriggerRuleDep(), + NotPreviouslySkippedDep(), } @property diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py index 57341d8..3b4531f 100644 --- a/airflow/models/skipmixin.py +++ b/airflow/models/skipmixin.py @@ -19,28 +19,28 @@ from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone -from airflow.utils.db import provide_session +from airflow.utils.db import create_session, provide_session from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State import six -from typing import Union, Iterable, Set +from typing import Set + +# The key used by SkipMixin to store XCom data. +XCOM_SKIPMIXIN_KEY = "skipmixin_key" + +# The dictionary key used to denote task IDs that are skipped +XCOM_SKIPMIXIN_SKIPPED = "skipped" + +# The dictionary key used to denote task IDs that are followed +XCOM_SKIPMIXIN_FOLLOWED = "followed" class SkipMixin(LoggingMixin): - @provide_session - def skip(self, dag_run, execution_date, tasks, session=None): + def _set_state_to_skipped(self, dag_run, execution_date, tasks, session): """ - Sets tasks instances to skipped from the same dag run. - - :param dag_run: the DagRun for which to set the tasks to skipped - :param execution_date: execution_date - :param tasks: tasks to skip (not task_ids) - :param session: db session to use + Used internally to set state of task instances to skipped from the same dag run. """ - if not tasks: - return - task_ids = [d.task_id for d in tasks] now = timezone.utcnow() @@ -48,12 +48,15 @@ class SkipMixin(LoggingMixin): session.query(TaskInstance).filter( TaskInstance.dag_id == dag_run.dag_id, TaskInstance.execution_date == dag_run.execution_date, - TaskInstance.task_id.in_(task_ids) - ).update({TaskInstance.state: State.SKIPPED, - TaskInstance.start_date: now, - TaskInstance.end_date: now}, - synchronize_session=False) - session.commit() + TaskInstance.task_id.in_(task_ids), + ).update( + { + TaskInstance.state: State.SKIPPED, + TaskInstance.start_date: now, + TaskInstance.end_date: now, + }, + synchronize_session=False, + ) else: assert execution_date is not None, "Execution date is None and no dag run" @@ -66,14 +69,56 @@ class SkipMixin(LoggingMixin): ti.end_date = now session.merge(ti) - session.commit() + @provide_session + def skip( + self, dag_run, execution_date, tasks, session=None, + ): + """ + Sets tasks instances to skipped from the same dag run. + + If this instance has a `task_id` attribute, store the list of skipped task IDs to XCom + so that NotPreviouslySkippedDep knows these tasks should be skipped when they + are cleared. - def skip_all_except(self, ti, branch_task_ids): - # type: (TaskInstance, Union[str, Iterable[str]]) -> None + :param dag_run: the DagRun for which to set the tasks to skipped + :param execution_date: execution_date + :param tasks: tasks to skip (not task_ids) + :param session: db session to use + """ + if not tasks: + return + + self._set_state_to_skipped(dag_run, execution_date, tasks, session) + session.commit() + + # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available. + try: + task_id = self.task_id + except AttributeError: + task_id = None + + if task_id is not None: + from airflow.models.xcom import XCom + + XCom.set( + key=XCOM_SKIPMIXIN_KEY, + value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]}, + task_id=task_id, + dag_id=dag_run.dag_id, + execution_date=dag_run.execution_date, + session=session + ) + + def skip_all_except( + self, ti, branch_task_ids + ): """ This method implements the logic for a branching operator; given a single task ID or list of task IDs to follow, this skips all other tasks immediately downstream of this operator. + + branch_task_ids is stored to XCom so that NotPreviouslySkippedDep knows skipped tasks or + newly added tasks should be skipped when they are cleared. """ self.log.info("Following branch %s", branch_task_ids) if isinstance(branch_task_ids, six.string_types): @@ -90,13 +135,22 @@ class SkipMixin(LoggingMixin): # is also a downstream task of the branch task, we exclude it from skipping. branch_downstream_task_ids = set() # type: Set[str] for b in branch_task_ids: - branch_downstream_task_ids.update(dag. - get_task(b). - get_flat_relative_ids(upstream=False)) + branch_downstream_task_ids.update( + dag.get_task(b).get_flat_relative_ids(upstream=False) + ) - skip_tasks = [t for t in downstream_tasks - if t.task_id not in branch_task_ids and - t.task_id not in branch_downstream_task_ids] + skip_tasks = [ + t + for t in downstream_tasks + if t.task_id not in branch_task_ids + and t.task_id not in branch_downstream_task_ids + ] self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks]) - self.skip(dag_run, ti.execution_date, skip_tasks) + with create_session() as session: + self._set_state_to_skipped( + dag_run, ti.execution_date, skip_tasks, session=session + ) + ti.xcom_push( + key=XCOM_SKIPMIXIN_KEY, value={XCOM_SKIPMIXIN_FOLLOWED: branch_task_ids} + ) diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index c5d999a..74307e4 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -66,6 +66,8 @@ class DepContext(object): :type ignore_task_deps: bool :param ignore_ti_state: Ignore the task instance's previous failure/success :type ignore_ti_state: bool + :param finished_tasks: A list of all the finished tasks of this run + :type finished_tasks: list[airflow.models.TaskInstance] """ def __init__( self, @@ -76,7 +78,8 @@ class DepContext(object): ignore_in_retry_period=False, ignore_in_reschedule_period=False, ignore_task_deps=False, - ignore_ti_state=False): + ignore_ti_state=False, + finished_tasks=None): self.deps = deps or set() self.flag_upstream_failed = flag_upstream_failed self.ignore_all_deps = ignore_all_deps @@ -85,6 +88,28 @@ class DepContext(object): self.ignore_in_reschedule_period = ignore_in_reschedule_period self.ignore_task_deps = ignore_task_deps self.ignore_ti_state = ignore_ti_state + self.finished_tasks = finished_tasks + + def ensure_finished_tasks(self, dag, execution_date, session): + """ + This method makes sure finished_tasks is populated if it's currently None. + This is for the strange feature of running tasks without dag_run. + + :param dag: The DAG for which to find finished tasks + :type dag: airflow.models.DAG + :param execution_date: The execution_date to look for + :param session: Database session to use + :return: A list of all the finished tasks of this DAG and execution_date + :rtype: list[airflow.models.TaskInstance] + """ + if self.finished_tasks is None: + self.finished_tasks = dag.get_task_instances( + start_date=execution_date, + end_date=execution_date, + state=State.finished() + [State.UPSTREAM_FAILED], + session=session, + ) + return self.finished_tasks # In order to be able to get queued a task must have one of these states diff --git a/airflow/ti_deps/deps/not_previously_skipped_dep.py b/airflow/ti_deps/deps/not_previously_skipped_dep.py new file mode 100644 index 0000000..34ff6ac --- /dev/null +++ b/airflow/ti_deps/deps/not_previously_skipped_dep.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.ti_deps.deps.base_ti_dep import BaseTIDep + + +class NotPreviouslySkippedDep(BaseTIDep): + """ + Determines if any of the task's direct upstream relatives have decided this task should + be skipped. + """ + + NAME = "Not Previously Skipped" + IGNORABLE = True + IS_TASK_DEP = True + + def _get_dep_statuses( + self, ti, session, dep_context + ): # pylint: disable=signature-differs + from airflow.models.skipmixin import ( + SkipMixin, + XCOM_SKIPMIXIN_KEY, + XCOM_SKIPMIXIN_SKIPPED, + XCOM_SKIPMIXIN_FOLLOWED, + ) + from airflow.utils.state import State + + upstream = ti.task.get_direct_relatives(upstream=True) + + finished_tasks = dep_context.ensure_finished_tasks( + ti.task.dag, ti.execution_date, session + ) + + finished_task_ids = {t.task_id for t in finished_tasks} + + for parent in upstream: + if isinstance(parent, SkipMixin): + if parent.task_id not in finished_task_ids: + # This can happen if the parent task has not yet run. + continue + + prev_result = ti.xcom_pull( + task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY + ) + + if prev_result is None: + # This can happen if the parent task has not yet run. + continue + + should_skip = False + if ( + XCOM_SKIPMIXIN_FOLLOWED in prev_result + and ti.task_id not in prev_result[XCOM_SKIPMIXIN_FOLLOWED] + ): + # Skip any tasks that are not in "followed" + should_skip = True + elif ( + XCOM_SKIPMIXIN_SKIPPED in prev_result + and ti.task_id in prev_result[XCOM_SKIPMIXIN_SKIPPED] + ): + # Skip any tasks that are in "skipped" + should_skip = True + + if should_skip: + # If the parent SkipMixin has run, and the XCom result stored indicates this + # ti should be skipped, set ti.state to SKIPPED and fail the rule so that the + # ti does not execute. + ti.set_state(State.SKIPPED, session) + yield self._failing_status( + reason="Skipping because of previous XCom result from parent task {}" + .format(parent.task_id) + ) + return diff --git a/requirements/requirements-python2.7.txt b/requirements/requirements-python2.7.txt index 6973e5a..2dc0f9b 100644 --- a/requirements/requirements-python2.7.txt +++ b/requirements/requirements-python2.7.txt @@ -8,12 +8,12 @@ Flask-Caching==1.3.3 Flask-JWT-Extended==3.24.1 Flask-Login==0.4.1 Flask-OpenID==1.2.5 -Flask-SQLAlchemy==2.4.3 +Flask-SQLAlchemy==2.4.4 Flask-WTF==0.14.3 Flask==1.1.2 JPype1==0.7.1 JayDeBeApi==1.2.3 -Jinja2==2.10.3 +Jinja2==2.11.2 Mako==1.1.3 Markdown==2.6.11 MarkupSafe==1.1.1 @@ -38,7 +38,7 @@ ansiwrap==0.8.4 apipkg==1.5 apispec==2.0.2 appdirs==1.4.4 -argcomplete==1.11.1 +argcomplete==1.12.0 asn1crypto==1.3.0 aspy.yaml==1.3.0 astroid==1.6.6 @@ -48,11 +48,11 @@ attrs==19.3.0 aws-sam-translator==1.25.0 aws-xray-sdk==2.6.0 azure-common==1.1.25 -azure-cosmos==3.1.2 +azure-cosmos==3.2.0 azure-datalake-store==0.0.48 azure-mgmt-containerinstance==1.5.0 azure-mgmt-nspkg==3.0.2 -azure-mgmt-resource==10.0.0 +azure-mgmt-resource==10.1.0 azure-nspkg==3.0.2 azure-storage-blob==2.1.0 azure-storage-common==2.1.0 @@ -69,9 +69,9 @@ beautifulsoup4==4.7.1 billiard==3.6.3.0 bleach==3.1.5 blinker==1.4 -boto3==1.14.14 +boto3==1.14.25 boto==2.49.0 -botocore==1.17.14 +botocore==1.17.25 cached-property==1.5.1 cachetools==3.1.1 cassandra-driver==3.20.2 @@ -80,7 +80,7 @@ celery==4.4.6 certifi==2020.6.20 cffi==1.14.0 cfgv==2.0.1 -cfn-lint==0.33.2 +cfn-lint==0.34.0 cgroupspy==0.1.6 chardet==3.0.4 click==6.7 @@ -91,11 +91,11 @@ configparser==3.5.3 contextdecorator==0.10.0 contextlib2==0.6.0.post1 cookies==2.2.1 -coverage==5.1 +coverage==5.2 croniter==0.3.34 -cryptography==2.9.2 +cryptography==3.0 cx-Oracle==7.3.0 -datadog==0.37.1 +datadog==0.38.0 decorator==4.4.2 defusedxml==0.6.0 dill==0.3.2 @@ -112,13 +112,13 @@ email-validator==1.1.1 entrypoints==0.3 enum34==1.1.10 execnet==1.7.1 -fastavro==0.23.5 +fastavro==0.23.6 filelock==3.0.12 flake8-colors==0.1.6 flake8==3.8.3 -flaky==3.6.1 -flask-swagger==0.2.13 -flower==0.9.4 +flaky==3.7.0 +flask-swagger==0.2.14 +flower==0.9.5 freezegun==0.3.15 funcsigs==1.0.2 functools32==3.2.3.post2 @@ -126,13 +126,13 @@ future-fstrings==1.2.0 future==0.18.2 futures==3.3.0 gcsfs==0.2.3 -google-api-core==1.21.0 -google-api-python-client==1.9.3 -google-auth-httplib2==0.0.3 +google-api-core==1.22.0 +google-api-python-client==1.10.0 +google-auth-httplib2==0.0.4 google-auth-oauthlib==0.4.1 -google-auth==1.18.0 -google-cloud-bigquery==1.25.0 -google-cloud-bigtable==1.2.1 +google-auth==1.19.2 +google-cloud-bigquery==1.26.0 +google-cloud-bigtable==1.3.0 google-cloud-container==1.0.1 google-cloud-core==1.3.0 google-cloud-dlp==1.0.0 @@ -147,7 +147,7 @@ google-cloud-videointelligence==1.15.0 google-cloud-vision==1.0.0 google-resumable-media==0.5.1 googleapis-common-protos==1.52.0 -graphviz==0.14 +graphviz==0.14.1 grpc-google-iam-v1==0.12.3 grpcio-gcp==0.2.2 grpcio==1.30.0 @@ -155,9 +155,9 @@ gunicorn==19.10.0 hdfs==2.5.8 hmsclient==0.1.1 httplib2==0.18.1 -humanize==0.5.1 +humanize==1.0.0 hvac==0.10.4 -identify==1.4.21 +identify==1.4.25 idna==2.10 ijson==2.6.1 imagesize==1.2.0 @@ -230,9 +230,10 @@ pluggy==0.13.1 pre-commit==1.21.0 presto-python-client==0.7.0 prison==0.1.0 +prometheus-client==0.8.0 prompt-toolkit==1.0.18 protobuf==3.12.2 -psutil==5.7.0 +psutil==5.7.2 psycopg2-binary==2.8.5 ptyprocess==0.6.0 py==1.9.0 @@ -255,8 +256,8 @@ pytest-cov==2.10.0 pytest-forked==1.2.0 pytest-instafail==0.4.2 pytest-rerunfailures==9.0 -pytest-timeout==1.4.1 -pytest-xdist==1.32.0 +pytest-timeout==1.4.2 +pytest-xdist==1.33.0 pytest==4.6.11 python-daemon==2.2.4 python-dateutil==2.8.1 @@ -268,7 +269,7 @@ python-nvd3==0.15.0 python-openid==2.2.5 python-slugify==4.0.1 pytz==2020.1 -pytzdata==2019.3 +pytzdata==2020.1 pywinrm==0.4.1 pyzmq==19.0.1 qds-sdk==1.16.0 @@ -287,7 +288,7 @@ sasl==0.2.1 scandir==1.10.0 sendgrid==5.6.0 sentinels==1.0.0 -sentry-sdk==0.15.1 +sentry-sdk==0.16.1 setproctitle==1.1.10 simplegeneric==0.8.1 singledispatch==3.4.0.3 @@ -318,11 +319,11 @@ thrift==0.13.0 tokenize-rt==3.2.0 toml==0.10.1 tornado==5.1.1 -tqdm==4.47.0 +tqdm==4.48.0 traceback2==1.4.0 traitlets==4.3.3 typing-extensions==3.7.4.2 -typing==3.7.4.1 +typing==3.7.4.3 tzlocal==1.5.1 unicodecsv==0.14.1 unittest2==1.1.0 @@ -330,13 +331,13 @@ uritemplate==3.0.1 urllib3==1.25.9 vertica-python==0.10.4 vine==1.3.0 -virtualenv==20.0.25 +virtualenv==20.0.27 wcwidth==0.2.5 webencodings==0.5.1 websocket-client==0.57.0 wrapt==1.12.1 xmltodict==0.12.0 -yamllint==1.23.0 +yamllint==1.24.2 zdesk==2.7.1 zipp==1.2.0 zope.deprecation==4.4.0 diff --git a/requirements/requirements-python3.8.txt b/requirements/requirements-python3.8.txt index 747ae42..e715477 100644 --- a/requirements/requirements-python3.8.txt +++ b/requirements/requirements-python3.8.txt @@ -8,12 +8,12 @@ Flask-Caching==1.3.3 Flask-JWT-Extended==3.24.1 Flask-Login==0.4.1 Flask-OpenID==1.2.5 -Flask-SQLAlchemy==2.4.3 +Flask-SQLAlchemy==2.4.4 Flask-WTF==0.14.3 Flask==1.1.2 JPype1==0.7.1 JayDeBeApi==1.2.3 -Jinja2==2.10.3 +Jinja2==2.11.2 Mako==1.1.3 Markdown==2.6.11 MarkupSafe==1.1.1 @@ -24,9 +24,9 @@ PySmbClient==0.1.5 PyYAML==5.3.1 Pygments==2.6.1 SQLAlchemy-JSONField==0.9.0 -SQLAlchemy-Utils==0.36.6 +SQLAlchemy-Utils==0.36.8 SQLAlchemy==1.3.18 -Sphinx==3.1.1 +Sphinx==3.1.2 Unidecode==1.1.1 WTForms==2.3.1 Werkzeug==0.16.1 @@ -39,7 +39,7 @@ ansiwrap==0.8.4 apipkg==1.5 apispec==1.3.3 appdirs==1.4.4 -argcomplete==1.11.1 +argcomplete==1.12.0 asn1crypto==1.3.0 astroid==2.4.2 async-generator==1.10 @@ -48,10 +48,10 @@ attrs==19.3.0 aws-sam-translator==1.25.0 aws-xray-sdk==2.6.0 azure-common==1.1.25 -azure-cosmos==3.1.2 +azure-cosmos==3.2.0 azure-datalake-store==0.0.48 azure-mgmt-containerinstance==1.5.0 -azure-mgmt-resource==10.0.0 +azure-mgmt-resource==10.1.0 azure-nspkg==3.0.2 azure-storage-blob==2.1.0 azure-storage-common==2.1.0 @@ -62,9 +62,9 @@ beautifulsoup4==4.7.1 billiard==3.6.3.0 black==19.10b0 blinker==1.4 -boto3==1.14.14 +boto3==1.14.25 boto==2.49.0 -botocore==1.17.14 +botocore==1.17.25 cached-property==1.5.1 cachetools==4.1.1 cassandra-driver==3.20.2 @@ -73,7 +73,7 @@ celery==4.4.6 certifi==2020.6.20 cffi==1.14.0 cfgv==3.1.0 -cfn-lint==0.33.2 +cfn-lint==0.34.0 cgroupspy==0.1.6 chardet==3.0.4 click==6.7 @@ -81,11 +81,11 @@ cloudant==0.5.10 colorama==0.4.3 colorlog==4.0.2 configparser==3.5.3 -coverage==5.1 +coverage==5.2 croniter==0.3.34 -cryptography==2.9.2 +cryptography==3.0 cx-Oracle==8.0.0 -datadog==0.37.1 +datadog==0.38.0 decorator==4.4.2 defusedxml==0.6.0 dill==0.3.2 @@ -101,27 +101,27 @@ elasticsearch==5.5.3 email-validator==1.1.1 entrypoints==0.3 execnet==1.7.1 -fastavro==0.23.5 +fastavro==0.23.6 filelock==3.0.12 flake8-colors==0.1.6 flake8==3.8.3 -flaky==3.6.1 -flask-swagger==0.2.13 -flower==0.9.4 +flaky==3.7.0 +flask-swagger==0.2.14 +flower==0.9.5 freezegun==0.3.15 fsspec==0.7.4 funcsigs==1.0.2 future-fstrings==1.2.0 future==0.18.2 gcsfs==0.6.2 -google-api-core==1.21.0 -google-api-python-client==1.9.3 -google-auth-httplib2==0.0.3 +google-api-core==1.22.0 +google-api-python-client==1.10.0 +google-auth-httplib2==0.0.4 google-auth-oauthlib==0.4.1 -google-auth==1.18.0 -google-cloud-bigquery==1.25.0 -google-cloud-bigtable==1.2.1 -google-cloud-container==1.0.1 +google-auth==1.19.2 +google-cloud-bigquery==1.26.0 +google-cloud-bigtable==1.3.0 +google-cloud-container==2.0.0 google-cloud-core==1.3.0 google-cloud-dlp==1.0.0 google-cloud-language==1.3.0 @@ -135,17 +135,17 @@ google-cloud-videointelligence==1.15.0 google-cloud-vision==1.0.0 google-resumable-media==0.5.1 googleapis-common-protos==1.52.0 -graphviz==0.14 +graphviz==0.14.1 grpc-google-iam-v1==0.12.3 grpcio-gcp==0.2.2 grpcio==1.30.0 -gunicorn==19.10.0 +gunicorn==20.0.4 hdfs==2.5.8 hmsclient==0.1.1 httplib2==0.18.1 -humanize==0.5.1 +humanize==2.5.0 hvac==0.10.4 -identify==1.4.21 +identify==1.4.25 idna==2.10 imagesize==1.2.0 importlib-metadata==1.7.0 @@ -156,7 +156,7 @@ ipython==7.16.1 iso8601==0.1.12 isodate==0.6.0 itsdangerous==1.1.0 -jedi==0.17.1 +jedi==0.17.2 jira==2.0.0 jmespath==0.10.0 json-merge-patch==0.2 @@ -166,12 +166,13 @@ jsonpickle==1.4.1 jsonpointer==2.0 jsonschema==3.2.0 junit-xml==1.9 -jupyter-client==6.1.5 +jupyter-client==6.1.6 jupyter-core==4.6.3 kombu==4.6.11 kubernetes==11.0.0 lazy-object-proxy==1.5.0 ldap3==2.7 +libcst==0.3.7 lockfile==0.12.2 marshmallow-enum==1.5.1 marshmallow-sqlalchemy==0.23.1 @@ -188,14 +189,14 @@ mypy-extensions==0.4.3 mypy==0.720 mysqlclient==1.3.14 natsort==7.0.1 -nbclient==0.4.0 +nbclient==0.4.1 nbformat==5.0.7 -nest-asyncio==1.3.3 +nest-asyncio==1.4.0 networkx==2.4 nodeenv==1.4.0 nteract-scrapbook==0.4.1 ntlm-auth==1.5.0 -numpy==1.19.0 +numpy==1.19.1 oauthlib==3.1.0 oscrypto==1.2.0 packaging==20.4 @@ -212,12 +213,14 @@ pexpect==4.8.0 pickleshare==0.7.5 pinotdb==0.1.1 pluggy==0.13.1 -pre-commit==2.5.1 +pre-commit==2.6.0 presto-python-client==0.7.0 prison==0.1.3 +prometheus-client==0.8.0 prompt-toolkit==3.0.5 +proto-plus==1.3.2 protobuf==3.12.2 -psutil==5.7.0 +psutil==5.7.2 psycopg2-binary==2.8.5 ptyprocess==0.6.0 py==1.9.0 @@ -240,8 +243,8 @@ pytest-cov==2.10.0 pytest-forked==1.2.0 pytest-instafail==0.4.2 pytest-rerunfailures==9.0 -pytest-timeout==1.4.1 -pytest-xdist==1.32.0 +pytest-timeout==1.4.2 +pytest-xdist==1.33.0 pytest==5.4.3 python-daemon==2.2.4 python-dateutil==2.8.1 @@ -253,12 +256,12 @@ python-nvd3==0.15.0 python-slugify==4.0.1 python3-openid==3.2.0 pytz==2020.1 -pytzdata==2019.3 +pytzdata==2020.1 pywinrm==0.4.1 pyzmq==19.0.1 qds-sdk==1.16.0 redis==3.5.3 -regex==2020.6.8 +regex==2020.7.14 requests-futures==0.9.4 requests-kerberos==0.12.0 requests-mock==1.8.0 @@ -272,12 +275,12 @@ s3transfer==0.3.3 sasl==0.2.1 sendgrid==5.6.0 sentinels==1.0.0 -sentry-sdk==0.15.1 +sentry-sdk==0.16.1 setproctitle==1.1.10 six==1.15.0 slackclient==1.3.2 snowballstemmer==2.0.0 -snowflake-connector-python==2.2.8 +snowflake-connector-python==2.2.9 snowflake-sqlalchemy==1.2.3 soupsieve==2.0.1 sphinx-argparse==0.2.5 @@ -304,22 +307,23 @@ thrift-sasl==0.4.2 thrift==0.13.0 toml==0.10.1 tornado==5.1.1 -tqdm==4.47.0 +tqdm==4.48.0 traitlets==4.3.3 typed-ast==1.4.1 typing-extensions==3.7.4.2 +typing-inspect==0.6.0 tzlocal==1.5.1 unicodecsv==0.14.1 uritemplate==3.0.1 urllib3==1.25.9 vertica-python==0.10.4 vine==1.3.0 -virtualenv==20.0.25 +virtualenv==20.0.27 wcwidth==0.2.5 websocket-client==0.57.0 wrapt==1.12.1 xmltodict==0.12.0 -yamllint==1.23.0 +yamllint==1.24.2 zdesk==2.7.1 zipp==3.1.0 zope.deprecation==4.4.0 diff --git a/requirements/setup-2.7.md5 b/requirements/setup-2.7.md5 index 7302c51..d24fa17 100644 --- a/requirements/setup-2.7.md5 +++ b/requirements/setup-2.7.md5 @@ -1 +1 @@ -da591fb5f6ed08129068e227610706cb /opt/airflow/setup.py +52a5d9b968ee82e35b5b49ed02361377 /opt/airflow/setup.py diff --git a/requirements/setup-3.8.md5 b/requirements/setup-3.8.md5 index 7302c51..d24fa17 100644 --- a/requirements/setup-3.8.md5 +++ b/requirements/setup-3.8.md5 @@ -1 +1 @@ -da591fb5f6ed08129068e227610706cb /opt/airflow/setup.py +52a5d9b968ee82e35b5b49ed02361377 /opt/airflow/setup.py diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 48f70a9..161e479 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3011,3 +3011,42 @@ class SchedulerJobTest(unittest.TestCase): self.assertIsNone(start_date) self.assertIsNone(end_date) self.assertIsNone(duration) + + +def test_task_with_upstream_skip_process_task_instances(): + """ + Test if _process_task_instances puts a task instance into SKIPPED state if any of its + upstream tasks are skipped according to TriggerRuleDep. + """ + with DAG( + dag_id='test_task_with_upstream_skip_dag', + start_date=DEFAULT_DATE, + schedule_interval=None + ) as dag: + dummy1 = DummyOperator(task_id='dummy1') + dummy2 = DummyOperator(task_id="dummy2") + dummy3 = DummyOperator(task_id="dummy3") + [dummy1, dummy2] >> dummy3 + + dag_file_processor = SchedulerJob(dag_ids=[], log=mock.MagicMock()) + dag.clear() + dr = dag.create_dagrun(run_id="manual__{}".format(DEFAULT_DATE.isoformat()), + state=State.RUNNING, + execution_date=DEFAULT_DATE) + assert dr is not None + + with create_session() as session: + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + # Set dummy1 to skipped and dummy2 to success. dummy3 remains as none. + tis[dummy1.task_id].state = State.SKIPPED + tis[dummy2.task_id].state = State.SUCCESS + assert tis[dummy3.task_id].state == State.NONE + + dag_file_processor._process_task_instances(dag, task_instances_list=Mock()) + + with create_session() as session: + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + assert tis[dummy1.task_id].state == State.SKIPPED + assert tis[dummy2.task_id].state == State.SUCCESS + # dummy3 should be skipped because dummy1 is skipped. + assert tis[dummy3.task_id].state == State.SKIPPED diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py index 3edff8d..6f23f59 100644 --- a/tests/operators/test_latest_only_operator.py +++ b/tests/operators/test_latest_only_operator.py @@ -47,15 +47,40 @@ def get_task_instances(task_id): class LatestOnlyOperatorTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + from tests.compat import MagicMock + from airflow.jobs import SchedulerJob - def setUp(self): - super(LatestOnlyOperatorTest, self).setUp() - self.dag = DAG( + cls.dag = DAG( 'test_dag', default_args={ 'owner': 'airflow', 'start_date': DEFAULT_DATE}, schedule_interval=INTERVAL) + + cls.dag.create_dagrun( + run_id="manual__1", + execution_date=DEFAULT_DATE, + state=State.RUNNING + ) + + cls.dag.create_dagrun( + run_id="manual__2", + execution_date=timezone.datetime(2016, 1, 1, 12), + state=State.RUNNING + ) + + cls.dag.create_dagrun( + run_id="manual__3", + execution_date=END_DATE, + state=State.RUNNING + ) + + cls.dag_file_processor = SchedulerJob(dag_ids=[], log=MagicMock()) + + def setUp(self): + super(LatestOnlyOperatorTest, self).setUp() self.addCleanup(self.dag.clear) freezer = freeze_time(FROZEN_NOW) freezer.start() @@ -86,6 +111,7 @@ class LatestOnlyOperatorTest(unittest.TestCase): downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances('latest') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=latest_instances) exec_date_to_latest_state = { ti.execution_date: ti.state for ti in latest_instances} self.assertEqual({ @@ -95,6 +121,7 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_latest_state) downstream_instances = get_task_instances('downstream') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=downstream_instances) exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ @@ -104,6 +131,7 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_downstream_state) downstream_instances = get_task_instances('downstream_2') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=downstream_instances) exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ @@ -126,32 +154,13 @@ class LatestOnlyOperatorTest(unittest.TestCase): downstream_task.set_upstream(latest_task) downstream_task2.set_upstream(downstream_task) - self.dag.create_dagrun( - run_id="manual__1", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING - ) - - self.dag.create_dagrun( - run_id="manual__2", - start_date=timezone.utcnow(), - execution_date=timezone.datetime(2016, 1, 1, 12), - state=State.RUNNING - ) - - self.dag.create_dagrun( - run_id="manual__3", - start_date=timezone.utcnow(), - execution_date=END_DATE, - state=State.RUNNING - ) - latest_task.run(start_date=DEFAULT_DATE, end_date=END_DATE) downstream_task.run(start_date=DEFAULT_DATE, end_date=END_DATE) downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE) latest_instances = get_task_instances('latest') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=latest_instances) + exec_date_to_latest_state = { ti.execution_date: ti.state for ti in latest_instances} self.assertEqual({ @@ -161,6 +170,8 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_latest_state) downstream_instances = get_task_instances('downstream') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=downstream_instances) + exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ @@ -170,6 +181,7 @@ class LatestOnlyOperatorTest(unittest.TestCase): exec_date_to_downstream_state) downstream_instances = get_task_instances('downstream_2') + self.dag_file_processor._process_task_instances(self.dag, task_instances_list=downstream_instances) exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} self.assertEqual({ diff --git a/tests/operators/test_python_operator.py b/tests/operators/test_python_operator.py index 6f3dfe2..a92213a 100644 --- a/tests/operators/test_python_operator.py +++ b/tests/operators/test_python_operator.py @@ -32,6 +32,7 @@ from datetime import timedelta, date from airflow.exceptions import AirflowException from airflow.models import TaskInstance as TI, DAG, DagRun +from airflow.models.taskinstance import clear_task_instances from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from airflow.operators.python_operator import ShortCircuitOperator @@ -491,7 +492,7 @@ class BranchOperatorTest(unittest.TestCase): elif ti.task_id == 'branch_2': self.assertEqual(ti.state, State.NONE) else: - raise Exception + raise def test_with_skip_in_branch_downstream_dependencies2(self): self.branch_op = BranchPythonOperator(task_id='make_choice', @@ -520,7 +521,63 @@ class BranchOperatorTest(unittest.TestCase): elif ti.task_id == 'branch_2': self.assertEqual(ti.state, State.NONE) else: - raise Exception + raise + + def test_clear_skipped_downstream_task(self): + """ + After a downstream task is skipped by BranchPythonOperator, clearing the skipped task + should not cause it to be executed. + """ + branch_op = BranchPythonOperator(task_id='make_choice', + dag=self.dag, + python_callable=lambda: 'branch_1') + branches = [self.branch_1, self.branch_2] + branch_op >> branches + self.dag.clear() + + dr = self.dag.create_dagrun( + run_id="manual__", + start_date=timezone.utcnow(), + execution_date=DEFAULT_DATE, + state=State.RUNNING + ) + + branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + for task in branches: + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + tis = dr.get_task_instances() + for ti in tis: + if ti.task_id == 'make_choice': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_1': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_2': + self.assertEqual(ti.state, State.SKIPPED) + else: + raise + + children_tis = [ti for ti in tis if ti.task_id in branch_op.get_direct_relative_ids()] + + # Clear the children tasks. + with create_session() as session: + clear_task_instances(children_tis, session=session, dag=self.dag) + + # Run the cleared tasks again. + for task in branches: + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + # Check if the states are correct after children tasks are cleared. + for ti in dr.get_task_instances(): + if ti.task_id == 'make_choice': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_1': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'branch_2': + self.assertEqual(ti.state, State.SKIPPED) + else: + raise class ShortCircuitOperatorTest(unittest.TestCase): @@ -660,3 +717,61 @@ class ShortCircuitOperatorTest(unittest.TestCase): self.assertEqual(ti.state, State.NONE) else: raise + + def test_clear_skipped_downstream_task(self): + """ + After a downstream task is skipped by ShortCircuitOperator, clearing the skipped task + should not cause it to be executed. + """ + dag = DAG('shortcircuit_clear_skipped_downstream_task', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + }, + schedule_interval=INTERVAL) + short_op = ShortCircuitOperator(task_id='make_choice', + dag=dag, + python_callable=lambda: False) + downstream = DummyOperator(task_id='downstream', dag=dag) + + short_op >> downstream + + dag.clear() + + dr = dag.create_dagrun( + run_id="manual__", + start_date=timezone.utcnow(), + execution_date=DEFAULT_DATE, + state=State.RUNNING + ) + + short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + downstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + tis = dr.get_task_instances() + + for ti in tis: + if ti.task_id == 'make_choice': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'downstream': + self.assertEqual(ti.state, State.SKIPPED) + else: + raise + + # Clear downstream + with create_session() as session: + clear_task_instances([t for t in tis if t.task_id == "downstream"], + session=session, + dag=dag) + + # Run downstream again + downstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + # Check if the states are correct. + for ti in dr.get_task_instances(): + if ti.task_id == 'make_choice': + self.assertEqual(ti.state, State.SUCCESS) + elif ti.task_id == 'downstream': + self.assertEqual(ti.state, State.SKIPPED) + else: + raise diff --git a/tests/ti_deps/deps/test_not_previously_skipped_dep.py b/tests/ti_deps/deps/test_not_previously_skipped_dep.py new file mode 100644 index 0000000..30da9cf --- /dev/null +++ b/tests/ti_deps/deps/test_not_previously_skipped_dep.py @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pendulum + +from airflow.models import DAG, TaskInstance +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep +from airflow.utils.db import create_session +from airflow.utils.state import State + + +def test_no_parent(): + """ + A simple DAG with a single task. NotPreviouslySkippedDep is met. + """ + start_date = pendulum.datetime(2020, 1, 1) + dag = DAG("test_test_no_parent_dag", schedule_interval=None, start_date=start_date) + op1 = DummyOperator(task_id="op1", dag=dag) + + ti1 = TaskInstance(op1, start_date) + + with create_session() as session: + dep = NotPreviouslySkippedDep() + assert len(list(dep.get_dep_statuses(ti1, session, DepContext()))) == 0 + assert dep.is_met(ti1, session) + assert ti1.state != State.SKIPPED + + +def test_no_skipmixin_parent(): + """ + A simple DAG with no branching. Both op1 and op2 are DummyOperator. NotPreviouslySkippedDep is met. + """ + start_date = pendulum.datetime(2020, 1, 1) + dag = DAG( + "test_no_skipmixin_parent_dag", schedule_interval=None, start_date=start_date + ) + op1 = DummyOperator(task_id="op1", dag=dag) + op2 = DummyOperator(task_id="op2", dag=dag) + op1 >> op2 + + ti2 = TaskInstance(op2, start_date) + + with create_session() as session: + dep = NotPreviouslySkippedDep() + assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0 + assert dep.is_met(ti2, session) + assert ti2.state != State.SKIPPED + + +def test_parent_follow_branch(): + """ + A simple DAG with a BranchPythonOperator that follows op2. NotPreviouslySkippedDep is met. + """ + start_date = pendulum.datetime(2020, 1, 1) + dag = DAG( + "test_parent_follow_branch_dag", schedule_interval=None, start_date=start_date + ) + op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op2", dag=dag) + op2 = DummyOperator(task_id="op2", dag=dag) + op1 >> op2 + + TaskInstance(op1, start_date).run() + ti2 = TaskInstance(op2, start_date) + + with create_session() as session: + dep = NotPreviouslySkippedDep() + assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0 + assert dep.is_met(ti2, session) + assert ti2.state != State.SKIPPED + + +def test_parent_skip_branch(): + """ + A simple DAG with a BranchPythonOperator that does not follow op2. NotPreviouslySkippedDep is not met. + """ + start_date = pendulum.datetime(2020, 1, 1) + dag = DAG( + "test_parent_skip_branch_dag", schedule_interval=None, start_date=start_date + ) + op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", dag=dag) + op2 = DummyOperator(task_id="op2", dag=dag) + op3 = DummyOperator(task_id="op3", dag=dag) + op1 >> [op2, op3] + + TaskInstance(op1, start_date).run() + ti2 = TaskInstance(op2, start_date) + + with create_session() as session: + dep = NotPreviouslySkippedDep() + assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 1 + assert not dep.is_met(ti2, session) + assert ti2.state == State.SKIPPED + + +def test_parent_not_executed(): + """ + A simple DAG with a BranchPythonOperator that does not follow op2. Parent task is not yet + executed (no xcom data). NotPreviouslySkippedDep is met (no decision). + """ + start_date = pendulum.datetime(2020, 1, 1) + dag = DAG( + "test_parent_not_executed_dag", schedule_interval=None, start_date=start_date + ) + op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", dag=dag) + op2 = DummyOperator(task_id="op2", dag=dag) + op3 = DummyOperator(task_id="op3", dag=dag) + op1 >> [op2, op3] + + ti2 = TaskInstance(op2, start_date) + + with create_session() as session: + dep = NotPreviouslySkippedDep() + assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0 + assert dep.is_met(ti2, session) + assert ti2.state == State.NONE diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 45514f6..8255015 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -165,6 +165,46 @@ class TriggerRuleDepTest(unittest.TestCase): self.assertEqual(len(dep_statuses), 1) self.assertFalse(dep_statuses[0].passed) + def test_all_success_tr_skip(self): + """ + All-success trigger rule fails when some upstream tasks are skipped. + """ + ti = self._get_task_instance(TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=1, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=False, + session="Fake Session")) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + + def test_all_success_tr_skip_flag_upstream(self): + """ + All-success trigger rule fails when some upstream tasks are skipped. The state of the ti + should be set to SKIPPED when flag_upstream_failed is True. + """ + ti = self._get_task_instance(TriggerRule.ALL_SUCCESS, + upstream_task_ids=["FakeTaskID", + "OtherFakeTaskID"]) + dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule( + ti=ti, + successes=1, + skipped=1, + failed=0, + upstream_failed=0, + done=2, + flag_upstream_failed=True, + session=Mock())) + self.assertEqual(len(dep_statuses), 1) + self.assertFalse(dep_statuses[0].passed) + self.assertEqual(ti.state, State.SKIPPED) + def test_none_failed_tr_success(self): """ All success including skip trigger rule success
