This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e7cb1f  Add ignore_first_depends_on_past for scheduled jobs (#22491)
8e7cb1f is described below

commit 8e7cb1f0546382ca9b141603b21850f1d6019ee1
Author: Ping Zhang <[email protected]>
AuthorDate: Thu Mar 24 18:37:54 2022 -0700

    Add ignore_first_depends_on_past for scheduled jobs (#22491)
---
 airflow/config_templates/config.yml           | 11 ++++++
 airflow/config_templates/default_airflow.cfg  |  7 ++++
 airflow/models/baseoperator.py                |  4 ++
 airflow/serialization/schema.json             |  1 +
 airflow/ti_deps/deps/prev_dagrun_dep.py       | 20 ++++++++++
 tests/serialization/test_dag_serialization.py |  1 +
 tests/ti_deps/deps/test_prev_dagrun_dep.py    | 55 ++++++++++++++++++++++++++-
 7 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 80bc374..dafdeae 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1949,6 +1949,17 @@
       type: string
       example: ~
       default: "True"
+    - name: ignore_first_depends_on_past_by_default
+      description: |
+        Setting this to True will make first task instance of a task
+        ignore depends_on_past setting. A task instance will be considered
+        as the first task instance of a task when there is no task instance
+        in the DB with an execution_date earlier than it., i.e. no manual 
marking
+        success will be needed for a newly added task to be scheduled.
+      version_added: 2.3.0
+      type: string
+      example: ~
+      default: "True"
     - name: max_tis_per_query
       description: |
         This changes the batch size of queries in the scheduling main loop.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 4c5754a..3921d88 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -980,6 +980,13 @@ zombie_detection_interval = 10.0
 # DAG definition (catchup)
 catchup_by_default = True
 
+# Setting this to True will make first task instance of a task
+# ignore depends_on_past setting. A task instance will be considered
+# as the first task instance of a task when there is no task instance
+# in the DB with an execution_date earlier than it., i.e. no manual marking
+# success will be needed for a newly added task to be scheduled.
+ignore_first_depends_on_past_by_default = True
+
 # This changes the batch size of queries in the scheduling main loop.
 # If this is too high, SQL query performance may be impacted by
 # complexity of query predicate, and/or excessive locking.
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index c3fcbc8..2d29bc9 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -714,6 +714,9 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         depends_on_past: bool = False,
+        ignore_first_depends_on_past: bool = conf.getboolean(
+            'scheduler', 'ignore_first_depends_on_past_by_default'
+        ),
         wait_for_downstream: bool = False,
         dag: Optional['DAG'] = None,
         params: Optional[Dict] = None,
@@ -836,6 +839,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
         self.trigger_rule = trigger_rule
         self.depends_on_past: bool = depends_on_past
+        self.ignore_first_depends_on_past = ignore_first_depends_on_past
         self.wait_for_downstream = wait_for_downstream
         if wait_for_downstream:
             self.depends_on_past = True
diff --git a/airflow/serialization/schema.json 
b/airflow/serialization/schema.json
index 6eab39b..423950b 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -175,6 +175,7 @@
         "end_date": { "$ref": "#/definitions/datetime" },
         "trigger_rule": { "type": "string" },
         "depends_on_past": { "type": "boolean" },
+        "ignore_first_depends_on_past": { "type": "boolean" },
         "wait_for_downstream": { "type": "boolean" },
         "retries": { "type": "number" },
         "queue": { "type": "string" },
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py 
b/airflow/ti_deps/deps/prev_dagrun_dep.py
index eecacae..fb7f7e4 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -15,7 +15,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from sqlalchemy import func
 
+from airflow.models.taskinstance import TaskInstance as TI
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
@@ -66,6 +68,24 @@ class PrevDagrunDep(BaseTIDep):
 
         previous_ti = last_dagrun.get_task_instance(ti.task_id, 
session=session)
         if not previous_ti:
+            if ti.task.ignore_first_depends_on_past:
+                has_historical_ti = (
+                    session.query(func.count(TI.dag_id))
+                    .filter(
+                        TI.dag_id == ti.dag_id,
+                        TI.task_id == ti.task_id,
+                        TI.execution_date < ti.execution_date,
+                    )
+                    .scalar()
+                    > 0
+                )
+                if not has_historical_ti:
+                    yield self._passing_status(
+                        reason="ignore_first_depends_on_past is true for this 
task "
+                        "and it is the first task instance for its task."
+                    )
+                    return
+
             yield self._failing_status(
                 reason="depends_on_past is true for this task's DAG, but the 
previous "
                 "task instance has not run yet."
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index c41daf2..1242443 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1080,6 +1080,7 @@ class TestStringifiedDAGs:
             '_pre_execute_hook': None,
             '_post_execute_hook': None,
             'depends_on_past': False,
+            'ignore_first_depends_on_past': True,
             'downstream_task_ids': set(),
             'do_xcom_push': True,
             'doc': None,
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py 
b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index 5970b5c..c2ade0a 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -26,7 +26,60 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.utils.state import State
-from airflow.utils.timezone import datetime
+from airflow.utils.timezone import convert_to_utc, datetime
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestPrevDagrunDep:
+    def teardown_method(self):
+        clear_db_runs()
+
+    def test_first_task_run_of_new_task(self):
+        """
+        The first task run of a new task in an old DAG should pass if the task 
has
+        ignore_first_depends_on_past set to True.
+        """
+        dag = DAG('test_dag')
+        old_task = BaseOperator(
+            task_id='test_task',
+            dag=dag,
+            depends_on_past=True,
+            start_date=convert_to_utc(datetime(2016, 1, 1)),
+            wait_for_downstream=False,
+        )
+        # Old DAG run will include only TaskInstance of old_task
+        dag.create_dagrun(
+            run_id='old_run',
+            state=State.SUCCESS,
+            execution_date=old_task.start_date,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        new_task = BaseOperator(
+            task_id='new_task',
+            dag=dag,
+            depends_on_past=True,
+            ignore_first_depends_on_past=True,
+            start_date=old_task.start_date,
+        )
+
+        # New DAG run will include 1st TaskInstance of new_task
+        dr = dag.create_dagrun(
+            run_id='new_run',
+            state=State.RUNNING,
+            execution_date=convert_to_utc(datetime(2016, 1, 2)),
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        ti = dr.get_task_instance(new_task.task_id)
+        ti.task = new_task
+
+        # this is important, we need to assert there is no previous_ti of this 
ti
+        assert ti.previous_ti is None
+
+        dep_context = DepContext(ignore_depends_on_past=False)
+        assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)
 
 
 @pytest.mark.parametrize(

Reply via email to