This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8e7cb1f Add ignore_first_depends_on_past for scheduled jobs (#22491)
8e7cb1f is described below
commit 8e7cb1f0546382ca9b141603b21850f1d6019ee1
Author: Ping Zhang <[email protected]>
AuthorDate: Thu Mar 24 18:37:54 2022 -0700
Add ignore_first_depends_on_past for scheduled jobs (#22491)
---
airflow/config_templates/config.yml | 11 ++++++
airflow/config_templates/default_airflow.cfg | 7 ++++
airflow/models/baseoperator.py | 4 ++
airflow/serialization/schema.json | 1 +
airflow/ti_deps/deps/prev_dagrun_dep.py | 20 ++++++++++
tests/serialization/test_dag_serialization.py | 1 +
tests/ti_deps/deps/test_prev_dagrun_dep.py | 55 ++++++++++++++++++++++++++-
7 files changed, 98 insertions(+), 1 deletion(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 80bc374..dafdeae 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1949,6 +1949,17 @@
type: string
example: ~
default: "True"
+ - name: ignore_first_depends_on_past_by_default
+ description: |
+ Setting this to True will make first task instance of a task
+ ignore depends_on_past setting. A task instance will be considered
+ as the first task instance of a task when there is no task instance
+ in the DB with an execution_date earlier than it., i.e. no manual
marking
+ success will be needed for a newly added task to be scheduled.
+ version_added: 2.3.0
+ type: string
+ example: ~
+ default: "True"
- name: max_tis_per_query
description: |
This changes the batch size of queries in the scheduling main loop.
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 4c5754a..3921d88 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -980,6 +980,13 @@ zombie_detection_interval = 10.0
# DAG definition (catchup)
catchup_by_default = True
+# Setting this to True will make first task instance of a task
+# ignore depends_on_past setting. A task instance will be considered
+# as the first task instance of a task when there is no task instance
+# in the DB with an execution_date earlier than it., i.e. no manual marking
+# success will be needed for a newly added task to be scheduled.
+ignore_first_depends_on_past_by_default = True
+
# This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by
# complexity of query predicate, and/or excessive locking.
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index c3fcbc8..2d29bc9 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -714,6 +714,9 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
depends_on_past: bool = False,
+ ignore_first_depends_on_past: bool = conf.getboolean(
+ 'scheduler', 'ignore_first_depends_on_past_by_default'
+ ),
wait_for_downstream: bool = False,
dag: Optional['DAG'] = None,
params: Optional[Dict] = None,
@@ -836,6 +839,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
self.trigger_rule = trigger_rule
self.depends_on_past: bool = depends_on_past
+ self.ignore_first_depends_on_past = ignore_first_depends_on_past
self.wait_for_downstream = wait_for_downstream
if wait_for_downstream:
self.depends_on_past = True
diff --git a/airflow/serialization/schema.json
b/airflow/serialization/schema.json
index 6eab39b..423950b 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -175,6 +175,7 @@
"end_date": { "$ref": "#/definitions/datetime" },
"trigger_rule": { "type": "string" },
"depends_on_past": { "type": "boolean" },
+ "ignore_first_depends_on_past": { "type": "boolean" },
"wait_for_downstream": { "type": "boolean" },
"retries": { "type": "number" },
"queue": { "type": "string" },
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py
b/airflow/ti_deps/deps/prev_dagrun_dep.py
index eecacae..fb7f7e4 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -15,7 +15,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from sqlalchemy import func
+from airflow.models.taskinstance import TaskInstance as TI
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.session import provide_session
from airflow.utils.state import State
@@ -66,6 +68,24 @@ class PrevDagrunDep(BaseTIDep):
previous_ti = last_dagrun.get_task_instance(ti.task_id,
session=session)
if not previous_ti:
+ if ti.task.ignore_first_depends_on_past:
+ has_historical_ti = (
+ session.query(func.count(TI.dag_id))
+ .filter(
+ TI.dag_id == ti.dag_id,
+ TI.task_id == ti.task_id,
+ TI.execution_date < ti.execution_date,
+ )
+ .scalar()
+ > 0
+ )
+ if not has_historical_ti:
+ yield self._passing_status(
+ reason="ignore_first_depends_on_past is true for this
task "
+ "and it is the first task instance for its task."
+ )
+ return
+
yield self._failing_status(
reason="depends_on_past is true for this task's DAG, but the
previous "
"task instance has not run yet."
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index c41daf2..1242443 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1080,6 +1080,7 @@ class TestStringifiedDAGs:
'_pre_execute_hook': None,
'_post_execute_hook': None,
'depends_on_past': False,
+ 'ignore_first_depends_on_past': True,
'downstream_task_ids': set(),
'do_xcom_push': True,
'doc': None,
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py
b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index 5970b5c..c2ade0a 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -26,7 +26,60 @@ from airflow.models.baseoperator import BaseOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.utils.state import State
-from airflow.utils.timezone import datetime
+from airflow.utils.timezone import convert_to_utc, datetime
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+
+class TestPrevDagrunDep:
+ def teardown_method(self):
+ clear_db_runs()
+
+ def test_first_task_run_of_new_task(self):
+ """
+ The first task run of a new task in an old DAG should pass if the task
has
+ ignore_first_depends_on_past set to True.
+ """
+ dag = DAG('test_dag')
+ old_task = BaseOperator(
+ task_id='test_task',
+ dag=dag,
+ depends_on_past=True,
+ start_date=convert_to_utc(datetime(2016, 1, 1)),
+ wait_for_downstream=False,
+ )
+ # Old DAG run will include only TaskInstance of old_task
+ dag.create_dagrun(
+ run_id='old_run',
+ state=State.SUCCESS,
+ execution_date=old_task.start_date,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ new_task = BaseOperator(
+ task_id='new_task',
+ dag=dag,
+ depends_on_past=True,
+ ignore_first_depends_on_past=True,
+ start_date=old_task.start_date,
+ )
+
+ # New DAG run will include 1st TaskInstance of new_task
+ dr = dag.create_dagrun(
+ run_id='new_run',
+ state=State.RUNNING,
+ execution_date=convert_to_utc(datetime(2016, 1, 2)),
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ ti = dr.get_task_instance(new_task.task_id)
+ ti.task = new_task
+
+ # this is important, we need to assert there is no previous_ti of this
ti
+ assert ti.previous_ti is None
+
+ dep_context = DepContext(ignore_depends_on_past=False)
+ assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context)
@pytest.mark.parametrize(