This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 179e93093b7b21fa60cf8d5984ec4fd0c6b52730
Author: yuqian90 <[email protected]>
AuthorDate: Fri Feb 21 19:35:55 2020 +0800

    [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)
    
    If a task is skipped by BranchPythonOperator, BaseBranchOperator or 
ShortCircuitOperator and the user then clears the skipped task later, it'll 
execute. This is probably not the right
    behaviour.
    
    This commit changes that so it will be skipped again. This can be ignored 
by running the task again with "Ignore Task Deps" override.
    
    (cherry picked from commit 1cdab56a6192f69962506b7ff632c986c84eb10d)
---
 UPDATING.md                                        |   7 ++
 airflow/models/baseoperator.py                     |   2 +
 airflow/models/skipmixin.py                        | 112 ++++++++++++-----
 airflow/ti_deps/dep_context.py                     |  27 ++++-
 airflow/ti_deps/deps/not_previously_skipped_dep.py |  88 ++++++++++++++
 requirements/requirements-python2.7.txt            |  67 ++++++-----
 requirements/requirements-python3.8.txt            |  92 +++++++-------
 requirements/setup-2.7.md5                         |   2 +-
 requirements/setup-3.8.md5                         |   2 +-
 tests/jobs/test_scheduler_job.py                   |  39 ++++++
 tests/operators/test_latest_only_operator.py       |  60 ++++++----
 tests/operators/test_python_operator.py            | 119 +++++++++++++++++-
 .../deps/test_not_previously_skipped_dep.py        | 133 +++++++++++++++++++++
 tests/ti_deps/deps/test_trigger_rule_dep.py        |  40 +++++++
 14 files changed, 655 insertions(+), 135 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 61734bb..f82ba10 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -25,6 +25,7 @@ assists users migrating to a new version.
 <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
 **Table of contents**
 
+- [Airflow 1.10.12](#airflow-11012)
 - [Airflow 1.10.11](#airflow-11011)
 - [Airflow 1.10.10](#airflow-11010)
 - [Airflow 1.10.9](#airflow-1109)
@@ -59,6 +60,12 @@ More tips can be found in the guide:
 https://developers.google.com/style/inclusive-documentation
 
 -->
+## Airflow 1.10.12
+
+### Clearing tasks skipped by SkipMixin will skip them
+
+Previously, when tasks skipped by SkipMixin (such as BranchPythonOperator, 
BaseBranchOperator and ShortCircuitOperator) are cleared, they execute. Since 
1.10.12, when such skipped tasks are cleared,
+they will be skipped again by the newly introduced NotPreviouslySkippedDep.
 
 ## Airflow 1.10.11
 
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 52037c5..266ad64 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -45,6 +45,7 @@ from airflow.models.pool import Pool
 from airflow.models.taskinstance import TaskInstance, clear_task_instances
 from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
+from airflow.ti_deps.deps.not_previously_skipped_dep import 
NotPreviouslySkippedDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils import timezone
@@ -575,6 +576,7 @@ class BaseOperator(LoggingMixin):
             NotInRetryPeriodDep(),
             PrevDagrunDep(),
             TriggerRuleDep(),
+            NotPreviouslySkippedDep(),
         }
 
     @property
diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 57341d8..3b4531f 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -19,28 +19,28 @@
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
-from airflow.utils.db import provide_session
+from airflow.utils.db import create_session, provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 import six
-from typing import Union, Iterable, Set
+from typing import Set
+
+# The key used by SkipMixin to store XCom data.
+XCOM_SKIPMIXIN_KEY = "skipmixin_key"
+
+# The dictionary key used to denote task IDs that are skipped
+XCOM_SKIPMIXIN_SKIPPED = "skipped"
+
+# The dictionary key used to denote task IDs that are followed
+XCOM_SKIPMIXIN_FOLLOWED = "followed"
 
 
 class SkipMixin(LoggingMixin):
-    @provide_session
-    def skip(self, dag_run, execution_date, tasks, session=None):
+    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
         """
-        Sets tasks instances to skipped from the same dag run.
-
-        :param dag_run: the DagRun for which to set the tasks to skipped
-        :param execution_date: execution_date
-        :param tasks: tasks to skip (not task_ids)
-        :param session: db session to use
+        Used internally to set state of task instances to skipped from the 
same dag run.
         """
-        if not tasks:
-            return
-
         task_ids = [d.task_id for d in tasks]
         now = timezone.utcnow()
 
@@ -48,12 +48,15 @@ class SkipMixin(LoggingMixin):
             session.query(TaskInstance).filter(
                 TaskInstance.dag_id == dag_run.dag_id,
                 TaskInstance.execution_date == dag_run.execution_date,
-                TaskInstance.task_id.in_(task_ids)
-            ).update({TaskInstance.state: State.SKIPPED,
-                      TaskInstance.start_date: now,
-                      TaskInstance.end_date: now},
-                     synchronize_session=False)
-            session.commit()
+                TaskInstance.task_id.in_(task_ids),
+            ).update(
+                {
+                    TaskInstance.state: State.SKIPPED,
+                    TaskInstance.start_date: now,
+                    TaskInstance.end_date: now,
+                },
+                synchronize_session=False,
+            )
         else:
             assert execution_date is not None, "Execution date is None and no 
dag run"
 
@@ -66,14 +69,56 @@ class SkipMixin(LoggingMixin):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self, dag_run, execution_date, tasks, session=None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run.
+
+        If this instance has a `task_id` attribute, store the list of skipped 
task IDs to XCom
+        so that NotPreviouslySkippedDep knows these tasks should be skipped 
when they
+        are cleared.
 
-    def skip_all_except(self, ti, branch_task_ids):
-        # type: (TaskInstance, Union[str, Iterable[str]]) -> None
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+
+        # SkipMixin may not necessarily have a task_id attribute. Only store 
to XCom if one is available.
+        try:
+            task_id = self.task_id
+        except AttributeError:
+            task_id = None
+
+        if task_id is not None:
+            from airflow.models.xcom import XCom
+
+            XCom.set(
+                key=XCOM_SKIPMIXIN_KEY,
+                value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
+                task_id=task_id,
+                dag_id=dag_run.dag_id,
+                execution_date=dag_run.execution_date,
+                session=session
+            )
+
+    def skip_all_except(
+        self, ti, branch_task_ids
+    ):
         """
         This method implements the logic for a branching operator; given a 
single
         task ID or list of task IDs to follow, this skips all other tasks
         immediately downstream of this operator.
+
+        branch_task_ids is stored to XCom so that NotPreviouslySkippedDep 
knows skipped tasks or
+        newly added tasks should be skipped when they are cleared.
         """
         self.log.info("Following branch %s", branch_task_ids)
         if isinstance(branch_task_ids, six.string_types):
@@ -90,13 +135,22 @@ class SkipMixin(LoggingMixin):
             # is also a downstream task of the branch task, we exclude it from 
skipping.
             branch_downstream_task_ids = set()  # type: Set[str]
             for b in branch_task_ids:
-                branch_downstream_task_ids.update(dag.
-                                                  get_task(b).
-                                                  
get_flat_relative_ids(upstream=False))
+                branch_downstream_task_ids.update(
+                    dag.get_task(b).get_flat_relative_ids(upstream=False)
+                )
 
-            skip_tasks = [t for t in downstream_tasks
-                          if t.task_id not in branch_task_ids and
-                          t.task_id not in branch_downstream_task_ids]
+            skip_tasks = [
+                t
+                for t in downstream_tasks
+                if t.task_id not in branch_task_ids
+                and t.task_id not in branch_downstream_task_ids
+            ]
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
-            self.skip(dag_run, ti.execution_date, skip_tasks)
+            with create_session() as session:
+                self._set_state_to_skipped(
+                    dag_run, ti.execution_date, skip_tasks, session=session
+                )
+                ti.xcom_push(
+                    key=XCOM_SKIPMIXIN_KEY, value={XCOM_SKIPMIXIN_FOLLOWED: 
branch_task_ids}
+                )
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index c5d999a..74307e4 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -66,6 +66,8 @@ class DepContext(object):
     :type ignore_task_deps: bool
     :param ignore_ti_state: Ignore the task instance's previous failure/success
     :type ignore_ti_state: bool
+    :param finished_tasks: A list of all the finished tasks of this run
+    :type finished_tasks: list[airflow.models.TaskInstance]
     """
     def __init__(
             self,
@@ -76,7 +78,8 @@ class DepContext(object):
             ignore_in_retry_period=False,
             ignore_in_reschedule_period=False,
             ignore_task_deps=False,
-            ignore_ti_state=False):
+            ignore_ti_state=False,
+            finished_tasks=None):
         self.deps = deps or set()
         self.flag_upstream_failed = flag_upstream_failed
         self.ignore_all_deps = ignore_all_deps
@@ -85,6 +88,28 @@ class DepContext(object):
         self.ignore_in_reschedule_period = ignore_in_reschedule_period
         self.ignore_task_deps = ignore_task_deps
         self.ignore_ti_state = ignore_ti_state
+        self.finished_tasks = finished_tasks
+
+    def ensure_finished_tasks(self, dag, execution_date, session):
+        """
+        This method makes sure finished_tasks is populated if it's currently 
None.
+        This is for the strange feature of running tasks without dag_run.
+
+        :param dag: The DAG for which to find finished tasks
+        :type dag: airflow.models.DAG
+        :param execution_date: The execution_date to look for
+        :param session: Database session to use
+        :return: A list of all the finished tasks of this DAG and 
execution_date
+        :rtype: list[airflow.models.TaskInstance]
+        """
+        if self.finished_tasks is None:
+            self.finished_tasks = dag.get_task_instances(
+                start_date=execution_date,
+                end_date=execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session,
+            )
+        return self.finished_tasks
 
 
 # In order to be able to get queued a task must have one of these states
diff --git a/airflow/ti_deps/deps/not_previously_skipped_dep.py 
b/airflow/ti_deps/deps/not_previously_skipped_dep.py
new file mode 100644
index 0000000..34ff6ac
--- /dev/null
+++ b/airflow/ti_deps/deps/not_previously_skipped_dep.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class NotPreviouslySkippedDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided 
this task should
+    be skipped.
+    """
+
+    NAME = "Not Previously Skipped"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(
+        self, ti, session, dep_context
+    ):  # pylint: disable=signature-differs
+        from airflow.models.skipmixin import (
+            SkipMixin,
+            XCOM_SKIPMIXIN_KEY,
+            XCOM_SKIPMIXIN_SKIPPED,
+            XCOM_SKIPMIXIN_FOLLOWED,
+        )
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.ensure_finished_tasks(
+            ti.task.dag, ti.execution_date, session
+        )
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                if parent.task_id not in finished_task_ids:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                prev_result = ti.xcom_pull(
+                    task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY
+                )
+
+                if prev_result is None:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                should_skip = False
+                if (
+                    XCOM_SKIPMIXIN_FOLLOWED in prev_result
+                    and ti.task_id not in prev_result[XCOM_SKIPMIXIN_FOLLOWED]
+                ):
+                    # Skip any tasks that are not in "followed"
+                    should_skip = True
+                elif (
+                    XCOM_SKIPMIXIN_SKIPPED in prev_result
+                    and ti.task_id in prev_result[XCOM_SKIPMIXIN_SKIPPED]
+                ):
+                    # Skip any tasks that are in "skipped"
+                    should_skip = True
+
+                if should_skip:
+                    # If the parent SkipMixin has run, and the XCom result 
stored indicates this
+                    # ti should be skipped, set ti.state to SKIPPED and fail 
the rule so that the
+                    # ti does not execute.
+                    ti.set_state(State.SKIPPED, session)
+                    yield self._failing_status(
+                        reason="Skipping because of previous XCom result from 
parent task {}"
+                        .format(parent.task_id)
+                    )
+                    return
diff --git a/requirements/requirements-python2.7.txt 
b/requirements/requirements-python2.7.txt
index 6973e5a..2dc0f9b 100644
--- a/requirements/requirements-python2.7.txt
+++ b/requirements/requirements-python2.7.txt
@@ -8,12 +8,12 @@ Flask-Caching==1.3.3
 Flask-JWT-Extended==3.24.1
 Flask-Login==0.4.1
 Flask-OpenID==1.2.5
-Flask-SQLAlchemy==2.4.3
+Flask-SQLAlchemy==2.4.4
 Flask-WTF==0.14.3
 Flask==1.1.2
 JPype1==0.7.1
 JayDeBeApi==1.2.3
-Jinja2==2.10.3
+Jinja2==2.11.2
 Mako==1.1.3
 Markdown==2.6.11
 MarkupSafe==1.1.1
@@ -38,7 +38,7 @@ ansiwrap==0.8.4
 apipkg==1.5
 apispec==2.0.2
 appdirs==1.4.4
-argcomplete==1.11.1
+argcomplete==1.12.0
 asn1crypto==1.3.0
 aspy.yaml==1.3.0
 astroid==1.6.6
@@ -48,11 +48,11 @@ attrs==19.3.0
 aws-sam-translator==1.25.0
 aws-xray-sdk==2.6.0
 azure-common==1.1.25
-azure-cosmos==3.1.2
+azure-cosmos==3.2.0
 azure-datalake-store==0.0.48
 azure-mgmt-containerinstance==1.5.0
 azure-mgmt-nspkg==3.0.2
-azure-mgmt-resource==10.0.0
+azure-mgmt-resource==10.1.0
 azure-nspkg==3.0.2
 azure-storage-blob==2.1.0
 azure-storage-common==2.1.0
@@ -69,9 +69,9 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 bleach==3.1.5
 blinker==1.4
-boto3==1.14.14
+boto3==1.14.25
 boto==2.49.0
-botocore==1.17.14
+botocore==1.17.25
 cached-property==1.5.1
 cachetools==3.1.1
 cassandra-driver==3.20.2
@@ -80,7 +80,7 @@ celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==2.0.1
-cfn-lint==0.33.2
+cfn-lint==0.34.0
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -91,11 +91,11 @@ configparser==3.5.3
 contextdecorator==0.10.0
 contextlib2==0.6.0.post1
 cookies==2.2.1
-coverage==5.1
+coverage==5.2
 croniter==0.3.34
-cryptography==2.9.2
+cryptography==3.0
 cx-Oracle==7.3.0
-datadog==0.37.1
+datadog==0.38.0
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
@@ -112,13 +112,13 @@ email-validator==1.1.1
 entrypoints==0.3
 enum34==1.1.10
 execnet==1.7.1
-fastavro==0.23.5
+fastavro==0.23.6
 filelock==3.0.12
 flake8-colors==0.1.6
 flake8==3.8.3
-flaky==3.6.1
-flask-swagger==0.2.13
-flower==0.9.4
+flaky==3.7.0
+flask-swagger==0.2.14
+flower==0.9.5
 freezegun==0.3.15
 funcsigs==1.0.2
 functools32==3.2.3.post2
@@ -126,13 +126,13 @@ future-fstrings==1.2.0
 future==0.18.2
 futures==3.3.0
 gcsfs==0.2.3
-google-api-core==1.21.0
-google-api-python-client==1.9.3
-google-auth-httplib2==0.0.3
+google-api-core==1.22.0
+google-api-python-client==1.10.0
+google-auth-httplib2==0.0.4
 google-auth-oauthlib==0.4.1
-google-auth==1.18.0
-google-cloud-bigquery==1.25.0
-google-cloud-bigtable==1.2.1
+google-auth==1.19.2
+google-cloud-bigquery==1.26.0
+google-cloud-bigtable==1.3.0
 google-cloud-container==1.0.1
 google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
@@ -147,7 +147,7 @@ google-cloud-videointelligence==1.15.0
 google-cloud-vision==1.0.0
 google-resumable-media==0.5.1
 googleapis-common-protos==1.52.0
-graphviz==0.14
+graphviz==0.14.1
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
 grpcio==1.30.0
@@ -155,9 +155,9 @@ gunicorn==19.10.0
 hdfs==2.5.8
 hmsclient==0.1.1
 httplib2==0.18.1
-humanize==0.5.1
+humanize==1.0.0
 hvac==0.10.4
-identify==1.4.21
+identify==1.4.25
 idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
@@ -230,9 +230,10 @@ pluggy==0.13.1
 pre-commit==1.21.0
 presto-python-client==0.7.0
 prison==0.1.0
+prometheus-client==0.8.0
 prompt-toolkit==1.0.18
 protobuf==3.12.2
-psutil==5.7.0
+psutil==5.7.2
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
 py==1.9.0
@@ -255,8 +256,8 @@ pytest-cov==2.10.0
 pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
-pytest-timeout==1.4.1
-pytest-xdist==1.32.0
+pytest-timeout==1.4.2
+pytest-xdist==1.33.0
 pytest==4.6.11
 python-daemon==2.2.4
 python-dateutil==2.8.1
@@ -268,7 +269,7 @@ python-nvd3==0.15.0
 python-openid==2.2.5
 python-slugify==4.0.1
 pytz==2020.1
-pytzdata==2019.3
+pytzdata==2020.1
 pywinrm==0.4.1
 pyzmq==19.0.1
 qds-sdk==1.16.0
@@ -287,7 +288,7 @@ sasl==0.2.1
 scandir==1.10.0
 sendgrid==5.6.0
 sentinels==1.0.0
-sentry-sdk==0.15.1
+sentry-sdk==0.16.1
 setproctitle==1.1.10
 simplegeneric==0.8.1
 singledispatch==3.4.0.3
@@ -318,11 +319,11 @@ thrift==0.13.0
 tokenize-rt==3.2.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.47.0
+tqdm==4.48.0
 traceback2==1.4.0
 traitlets==4.3.3
 typing-extensions==3.7.4.2
-typing==3.7.4.1
+typing==3.7.4.3
 tzlocal==1.5.1
 unicodecsv==0.14.1
 unittest2==1.1.0
@@ -330,13 +331,13 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.25
+virtualenv==20.0.27
 wcwidth==0.2.5
 webencodings==0.5.1
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
-yamllint==1.23.0
+yamllint==1.24.2
 zdesk==2.7.1
 zipp==1.2.0
 zope.deprecation==4.4.0
diff --git a/requirements/requirements-python3.8.txt 
b/requirements/requirements-python3.8.txt
index 747ae42..e715477 100644
--- a/requirements/requirements-python3.8.txt
+++ b/requirements/requirements-python3.8.txt
@@ -8,12 +8,12 @@ Flask-Caching==1.3.3
 Flask-JWT-Extended==3.24.1
 Flask-Login==0.4.1
 Flask-OpenID==1.2.5
-Flask-SQLAlchemy==2.4.3
+Flask-SQLAlchemy==2.4.4
 Flask-WTF==0.14.3
 Flask==1.1.2
 JPype1==0.7.1
 JayDeBeApi==1.2.3
-Jinja2==2.10.3
+Jinja2==2.11.2
 Mako==1.1.3
 Markdown==2.6.11
 MarkupSafe==1.1.1
@@ -24,9 +24,9 @@ PySmbClient==0.1.5
 PyYAML==5.3.1
 Pygments==2.6.1
 SQLAlchemy-JSONField==0.9.0
-SQLAlchemy-Utils==0.36.6
+SQLAlchemy-Utils==0.36.8
 SQLAlchemy==1.3.18
-Sphinx==3.1.1
+Sphinx==3.1.2
 Unidecode==1.1.1
 WTForms==2.3.1
 Werkzeug==0.16.1
@@ -39,7 +39,7 @@ ansiwrap==0.8.4
 apipkg==1.5
 apispec==1.3.3
 appdirs==1.4.4
-argcomplete==1.11.1
+argcomplete==1.12.0
 asn1crypto==1.3.0
 astroid==2.4.2
 async-generator==1.10
@@ -48,10 +48,10 @@ attrs==19.3.0
 aws-sam-translator==1.25.0
 aws-xray-sdk==2.6.0
 azure-common==1.1.25
-azure-cosmos==3.1.2
+azure-cosmos==3.2.0
 azure-datalake-store==0.0.48
 azure-mgmt-containerinstance==1.5.0
-azure-mgmt-resource==10.0.0
+azure-mgmt-resource==10.1.0
 azure-nspkg==3.0.2
 azure-storage-blob==2.1.0
 azure-storage-common==2.1.0
@@ -62,9 +62,9 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.14
+boto3==1.14.25
 boto==2.49.0
-botocore==1.17.14
+botocore==1.17.25
 cached-property==1.5.1
 cachetools==4.1.1
 cassandra-driver==3.20.2
@@ -73,7 +73,7 @@ celery==4.4.6
 certifi==2020.6.20
 cffi==1.14.0
 cfgv==3.1.0
-cfn-lint==0.33.2
+cfn-lint==0.34.0
 cgroupspy==0.1.6
 chardet==3.0.4
 click==6.7
@@ -81,11 +81,11 @@ cloudant==0.5.10
 colorama==0.4.3
 colorlog==4.0.2
 configparser==3.5.3
-coverage==5.1
+coverage==5.2
 croniter==0.3.34
-cryptography==2.9.2
+cryptography==3.0
 cx-Oracle==8.0.0
-datadog==0.37.1
+datadog==0.38.0
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
@@ -101,27 +101,27 @@ elasticsearch==5.5.3
 email-validator==1.1.1
 entrypoints==0.3
 execnet==1.7.1
-fastavro==0.23.5
+fastavro==0.23.6
 filelock==3.0.12
 flake8-colors==0.1.6
 flake8==3.8.3
-flaky==3.6.1
-flask-swagger==0.2.13
-flower==0.9.4
+flaky==3.7.0
+flask-swagger==0.2.14
+flower==0.9.5
 freezegun==0.3.15
 fsspec==0.7.4
 funcsigs==1.0.2
 future-fstrings==1.2.0
 future==0.18.2
 gcsfs==0.6.2
-google-api-core==1.21.0
-google-api-python-client==1.9.3
-google-auth-httplib2==0.0.3
+google-api-core==1.22.0
+google-api-python-client==1.10.0
+google-auth-httplib2==0.0.4
 google-auth-oauthlib==0.4.1
-google-auth==1.18.0
-google-cloud-bigquery==1.25.0
-google-cloud-bigtable==1.2.1
-google-cloud-container==1.0.1
+google-auth==1.19.2
+google-cloud-bigquery==1.26.0
+google-cloud-bigtable==1.3.0
+google-cloud-container==2.0.0
 google-cloud-core==1.3.0
 google-cloud-dlp==1.0.0
 google-cloud-language==1.3.0
@@ -135,17 +135,17 @@ google-cloud-videointelligence==1.15.0
 google-cloud-vision==1.0.0
 google-resumable-media==0.5.1
 googleapis-common-protos==1.52.0
-graphviz==0.14
+graphviz==0.14.1
 grpc-google-iam-v1==0.12.3
 grpcio-gcp==0.2.2
 grpcio==1.30.0
-gunicorn==19.10.0
+gunicorn==20.0.4
 hdfs==2.5.8
 hmsclient==0.1.1
 httplib2==0.18.1
-humanize==0.5.1
+humanize==2.5.0
 hvac==0.10.4
-identify==1.4.21
+identify==1.4.25
 idna==2.10
 imagesize==1.2.0
 importlib-metadata==1.7.0
@@ -156,7 +156,7 @@ ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 itsdangerous==1.1.0
-jedi==0.17.1
+jedi==0.17.2
 jira==2.0.0
 jmespath==0.10.0
 json-merge-patch==0.2
@@ -166,12 +166,13 @@ jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
 junit-xml==1.9
-jupyter-client==6.1.5
+jupyter-client==6.1.6
 jupyter-core==4.6.3
 kombu==4.6.11
 kubernetes==11.0.0
 lazy-object-proxy==1.5.0
 ldap3==2.7
+libcst==0.3.7
 lockfile==0.12.2
 marshmallow-enum==1.5.1
 marshmallow-sqlalchemy==0.23.1
@@ -188,14 +189,14 @@ mypy-extensions==0.4.3
 mypy==0.720
 mysqlclient==1.3.14
 natsort==7.0.1
-nbclient==0.4.0
+nbclient==0.4.1
 nbformat==5.0.7
-nest-asyncio==1.3.3
+nest-asyncio==1.4.0
 networkx==2.4
 nodeenv==1.4.0
 nteract-scrapbook==0.4.1
 ntlm-auth==1.5.0
-numpy==1.19.0
+numpy==1.19.1
 oauthlib==3.1.0
 oscrypto==1.2.0
 packaging==20.4
@@ -212,12 +213,14 @@ pexpect==4.8.0
 pickleshare==0.7.5
 pinotdb==0.1.1
 pluggy==0.13.1
-pre-commit==2.5.1
+pre-commit==2.6.0
 presto-python-client==0.7.0
 prison==0.1.3
+prometheus-client==0.8.0
 prompt-toolkit==3.0.5
+proto-plus==1.3.2
 protobuf==3.12.2
-psutil==5.7.0
+psutil==5.7.2
 psycopg2-binary==2.8.5
 ptyprocess==0.6.0
 py==1.9.0
@@ -240,8 +243,8 @@ pytest-cov==2.10.0
 pytest-forked==1.2.0
 pytest-instafail==0.4.2
 pytest-rerunfailures==9.0
-pytest-timeout==1.4.1
-pytest-xdist==1.32.0
+pytest-timeout==1.4.2
+pytest-xdist==1.33.0
 pytest==5.4.3
 python-daemon==2.2.4
 python-dateutil==2.8.1
@@ -253,12 +256,12 @@ python-nvd3==0.15.0
 python-slugify==4.0.1
 python3-openid==3.2.0
 pytz==2020.1
-pytzdata==2019.3
+pytzdata==2020.1
 pywinrm==0.4.1
 pyzmq==19.0.1
 qds-sdk==1.16.0
 redis==3.5.3
-regex==2020.6.8
+regex==2020.7.14
 requests-futures==0.9.4
 requests-kerberos==0.12.0
 requests-mock==1.8.0
@@ -272,12 +275,12 @@ s3transfer==0.3.3
 sasl==0.2.1
 sendgrid==5.6.0
 sentinels==1.0.0
-sentry-sdk==0.15.1
+sentry-sdk==0.16.1
 setproctitle==1.1.10
 six==1.15.0
 slackclient==1.3.2
 snowballstemmer==2.0.0
-snowflake-connector-python==2.2.8
+snowflake-connector-python==2.2.9
 snowflake-sqlalchemy==1.2.3
 soupsieve==2.0.1
 sphinx-argparse==0.2.5
@@ -304,22 +307,23 @@ thrift-sasl==0.4.2
 thrift==0.13.0
 toml==0.10.1
 tornado==5.1.1
-tqdm==4.47.0
+tqdm==4.48.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
+typing-inspect==0.6.0
 tzlocal==1.5.1
 unicodecsv==0.14.1
 uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.25
+virtualenv==20.0.27
 wcwidth==0.2.5
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
-yamllint==1.23.0
+yamllint==1.24.2
 zdesk==2.7.1
 zipp==3.1.0
 zope.deprecation==4.4.0
diff --git a/requirements/setup-2.7.md5 b/requirements/setup-2.7.md5
index 7302c51..d24fa17 100644
--- a/requirements/setup-2.7.md5
+++ b/requirements/setup-2.7.md5
@@ -1 +1 @@
-da591fb5f6ed08129068e227610706cb  /opt/airflow/setup.py
+52a5d9b968ee82e35b5b49ed02361377  /opt/airflow/setup.py
diff --git a/requirements/setup-3.8.md5 b/requirements/setup-3.8.md5
index 7302c51..d24fa17 100644
--- a/requirements/setup-3.8.md5
+++ b/requirements/setup-3.8.md5
@@ -1 +1 @@
-da591fb5f6ed08129068e227610706cb  /opt/airflow/setup.py
+52a5d9b968ee82e35b5b49ed02361377  /opt/airflow/setup.py
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 48f70a9..161e479 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3011,3 +3011,42 @@ class SchedulerJobTest(unittest.TestCase):
                 self.assertIsNone(start_date)
                 self.assertIsNone(end_date)
                 self.assertIsNone(duration)
+
+
+def test_task_with_upstream_skip_process_task_instances():
+    """
+    Test if _process_task_instances puts a task instance into SKIPPED state if 
any of its
+    upstream tasks are skipped according to TriggerRuleDep.
+    """
+    with DAG(
+        dag_id='test_task_with_upstream_skip_dag',
+        start_date=DEFAULT_DATE,
+        schedule_interval=None
+    ) as dag:
+        dummy1 = DummyOperator(task_id='dummy1')
+        dummy2 = DummyOperator(task_id="dummy2")
+        dummy3 = DummyOperator(task_id="dummy3")
+        [dummy1, dummy2] >> dummy3
+
+    dag_file_processor = SchedulerJob(dag_ids=[], log=mock.MagicMock())
+    dag.clear()
+    dr = 
dag.create_dagrun(run_id="manual__{}".format(DEFAULT_DATE.isoformat()),
+                           state=State.RUNNING,
+                           execution_date=DEFAULT_DATE)
+    assert dr is not None
+
+    with create_session() as session:
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+        # Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
+        tis[dummy1.task_id].state = State.SKIPPED
+        tis[dummy2.task_id].state = State.SUCCESS
+        assert tis[dummy3.task_id].state == State.NONE
+
+    dag_file_processor._process_task_instances(dag, task_instances_list=Mock())
+
+    with create_session() as session:
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+        assert tis[dummy1.task_id].state == State.SKIPPED
+        assert tis[dummy2.task_id].state == State.SUCCESS
+        # dummy3 should be skipped because dummy1 is skipped.
+        assert tis[dummy3.task_id].state == State.SKIPPED
diff --git a/tests/operators/test_latest_only_operator.py 
b/tests/operators/test_latest_only_operator.py
index 3edff8d..6f23f59 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -47,15 +47,40 @@ def get_task_instances(task_id):
 
 
 class LatestOnlyOperatorTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        from tests.compat import MagicMock
+        from airflow.jobs import SchedulerJob
 
-    def setUp(self):
-        super(LatestOnlyOperatorTest, self).setUp()
-        self.dag = DAG(
+        cls.dag = DAG(
             'test_dag',
             default_args={
                 'owner': 'airflow',
                 'start_date': DEFAULT_DATE},
             schedule_interval=INTERVAL)
+
+        cls.dag.create_dagrun(
+            run_id="manual__1",
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        cls.dag.create_dagrun(
+            run_id="manual__2",
+            execution_date=timezone.datetime(2016, 1, 1, 12),
+            state=State.RUNNING
+        )
+
+        cls.dag.create_dagrun(
+            run_id="manual__3",
+            execution_date=END_DATE,
+            state=State.RUNNING
+        )
+
+        cls.dag_file_processor = SchedulerJob(dag_ids=[], log=MagicMock())
+
+    def setUp(self):
+        super(LatestOnlyOperatorTest, self).setUp()
         self.addCleanup(self.dag.clear)
         freezer = freeze_time(FROZEN_NOW)
         freezer.start()
@@ -86,6 +111,7 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE)
 
         latest_instances = get_task_instances('latest')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=latest_instances)
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
@@ -95,6 +121,7 @@ class LatestOnlyOperatorTest(unittest.TestCase):
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=downstream_instances)
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
@@ -104,6 +131,7 @@ class LatestOnlyOperatorTest(unittest.TestCase):
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=downstream_instances)
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
@@ -126,32 +154,13 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         downstream_task.set_upstream(latest_task)
         downstream_task2.set_upstream(downstream_task)
 
-        self.dag.create_dagrun(
-            run_id="manual__1",
-            start_date=timezone.utcnow(),
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING
-        )
-
-        self.dag.create_dagrun(
-            run_id="manual__2",
-            start_date=timezone.utcnow(),
-            execution_date=timezone.datetime(2016, 1, 1, 12),
-            state=State.RUNNING
-        )
-
-        self.dag.create_dagrun(
-            run_id="manual__3",
-            start_date=timezone.utcnow(),
-            execution_date=END_DATE,
-            state=State.RUNNING
-        )
-
         latest_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)
         downstream_task.run(start_date=DEFAULT_DATE, end_date=END_DATE)
         downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE)
 
         latest_instances = get_task_instances('latest')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=latest_instances)
+
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
         self.assertEqual({
@@ -161,6 +170,8 @@ class LatestOnlyOperatorTest(unittest.TestCase):
             exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=downstream_instances)
+
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
@@ -170,6 +181,7 @@ class LatestOnlyOperatorTest(unittest.TestCase):
             exec_date_to_downstream_state)
 
         downstream_instances = get_task_instances('downstream_2')
+        self.dag_file_processor._process_task_instances(self.dag, 
task_instances_list=downstream_instances)
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
         self.assertEqual({
diff --git a/tests/operators/test_python_operator.py 
b/tests/operators/test_python_operator.py
index 6f3dfe2..a92213a 100644
--- a/tests/operators/test_python_operator.py
+++ b/tests/operators/test_python_operator.py
@@ -32,6 +32,7 @@ from datetime import timedelta, date
 
 from airflow.exceptions import AirflowException
 from airflow.models import TaskInstance as TI, DAG, DagRun
+from airflow.models.taskinstance import clear_task_instances
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import PythonOperator, 
BranchPythonOperator
 from airflow.operators.python_operator import ShortCircuitOperator
@@ -491,7 +492,7 @@ class BranchOperatorTest(unittest.TestCase):
             elif ti.task_id == 'branch_2':
                 self.assertEqual(ti.state, State.NONE)
             else:
-                raise Exception
+                raise
 
     def test_with_skip_in_branch_downstream_dependencies2(self):
         self.branch_op = BranchPythonOperator(task_id='make_choice',
@@ -520,7 +521,63 @@ class BranchOperatorTest(unittest.TestCase):
             elif ti.task_id == 'branch_2':
                 self.assertEqual(ti.state, State.NONE)
             else:
-                raise Exception
+                raise
+
+    def test_clear_skipped_downstream_task(self):
+        """
+        After a downstream task is skipped by BranchPythonOperator, clearing 
the skipped task
+        should not cause it to be executed.
+        """
+        branch_op = BranchPythonOperator(task_id='make_choice',
+                                         dag=self.dag,
+                                         python_callable=lambda: 'branch_1')
+        branches = [self.branch_1, self.branch_2]
+        branch_op >> branches
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        for task in branches:
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_2':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise
+
+        children_tis = [ti for ti in tis if ti.task_id in 
branch_op.get_direct_relative_ids()]
+
+        # Clear the children tasks.
+        with create_session() as session:
+            clear_task_instances(children_tis, session=session, dag=self.dag)
+
+        # Run the cleared tasks again.
+        for task in branches:
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        # Check if the states are correct after children tasks are cleared.
+        for ti in dr.get_task_instances():
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_2':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise
 
 
 class ShortCircuitOperatorTest(unittest.TestCase):
@@ -660,3 +717,61 @@ class ShortCircuitOperatorTest(unittest.TestCase):
                 self.assertEqual(ti.state, State.NONE)
             else:
                 raise
+
+    def test_clear_skipped_downstream_task(self):
+        """
+        After a downstream task is skipped by ShortCircuitOperator, clearing 
the skipped task
+        should not cause it to be executed.
+        """
+        dag = DAG('shortcircuit_clear_skipped_downstream_task',
+                  default_args={
+                      'owner': 'airflow',
+                      'start_date': DEFAULT_DATE
+                  },
+                  schedule_interval=INTERVAL)
+        short_op = ShortCircuitOperator(task_id='make_choice',
+                                        dag=dag,
+                                        python_callable=lambda: False)
+        downstream = DummyOperator(task_id='downstream', dag=dag)
+
+        short_op >> downstream
+
+        dag.clear()
+
+        dr = dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        downstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'downstream':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise
+
+        # Clear downstream
+        with create_session() as session:
+            clear_task_instances([t for t in tis if t.task_id == "downstream"],
+                                 session=session,
+                                 dag=dag)
+
+        # Run downstream again
+        downstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        # Check if the states are correct.
+        for ti in dr.get_task_instances():
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'downstream':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise
diff --git a/tests/ti_deps/deps/test_not_previously_skipped_dep.py 
b/tests/ti_deps/deps/test_not_previously_skipped_dep.py
new file mode 100644
index 0000000..30da9cf
--- /dev/null
+++ b/tests/ti_deps/deps/test_not_previously_skipped_dep.py
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.ti_deps.dep_context import DepContext
+from airflow.ti_deps.deps.not_previously_skipped_dep import 
NotPreviouslySkippedDep
+from airflow.utils.db import create_session
+from airflow.utils.state import State
+
+
+def test_no_parent():
+    """
+    A simple DAG with a single task. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG("test_test_no_parent_dag", schedule_interval=None, 
start_date=start_date)
+    op1 = DummyOperator(task_id="op1", dag=dag)
+
+    ti1 = TaskInstance(op1, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti1, session, DepContext()))) == 0
+        assert dep.is_met(ti1, session)
+        assert ti1.state != State.SKIPPED
+
+
+def test_no_skipmixin_parent():
+    """
+    A simple DAG with no branching. Both op1 and op2 are DummyOperator. 
NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_no_skipmixin_parent_dag", schedule_interval=None, 
start_date=start_date
+    )
+    op1 = DummyOperator(task_id="op1", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_follow_branch():
+    """
+    A simple DAG with a BranchPythonOperator that follows op2. 
NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_follow_branch_dag", schedule_interval=None, 
start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op2", 
dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_skip_branch():
+    """
+    A simple DAG with a BranchPythonOperator that does not follow op2. 
NotPreviouslySkippedDep is not met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_skip_branch_dag", schedule_interval=None, 
start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", 
dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op3 = DummyOperator(task_id="op3", dag=dag)
+    op1 >> [op2, op3]
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 1
+        assert not dep.is_met(ti2, session)
+        assert ti2.state == State.SKIPPED
+
+
+def test_parent_not_executed():
+    """
+    A simple DAG with a BranchPythonOperator that does not follow op2. Parent 
task is not yet
+    executed (no xcom data). NotPreviouslySkippedDep is met (no decision).
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_not_executed_dag", schedule_interval=None, 
start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", 
dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op3 = DummyOperator(task_id="op3", dag=dag)
+    op1 >> [op2, op3]
+
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state == State.NONE
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py 
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 45514f6..8255015 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -165,6 +165,46 @@ class TriggerRuleDepTest(unittest.TestCase):
         self.assertEqual(len(dep_statuses), 1)
         self.assertFalse(dep_statuses[0].passed)
 
+    def test_all_success_tr_skip(self):
+        """
+        All-success trigger rule fails when some upstream tasks are skipped.
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_SUCCESS,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=1,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_all_success_tr_skip_flag_upstream(self):
+        """
+        All-success trigger rule fails when some upstream tasks are skipped. 
The state of the ti
+        should be set to SKIPPED when flag_upstream_failed is True.
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_SUCCESS,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=1,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=True,
+            session=Mock()))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+        self.assertEqual(ti.state, State.SKIPPED)
+
     def test_none_failed_tr_success(self):
         """
         All success including skip trigger rule success

Reply via email to