This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c946fc3f0b `TriggerDagRunOperator` depreacte `exection_date` in favor
of `logical_date` (#39285)
c946fc3f0b is described below
commit c946fc3f0b4c55bd6fbf9a49950d6e24980b4abe
Author: Felipe Lolas <[email protected]>
AuthorDate: Sat Apr 27 11:46:54 2024 -0400
`TriggerDagRunOperator` depreacte `exection_date` in favor of
`logical_date` (#39285)
* added logical_date parameter
* fix comment
---
airflow/operators/trigger_dagrun.py | 62 ++++++++------
tests/operators/test_trigger_dagrun.py | 148 ++++++++++++++++++---------------
2 files changed, 120 insertions(+), 90 deletions(-)
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index ab74d4c862..f8cfa5256a 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import datetime
import json
import time
+import warnings
from typing import TYPE_CHECKING, Any, Sequence, cast
from sqlalchemy import select
@@ -27,7 +28,7 @@ from sqlalchemy.orm.exc import NoResultFound
from airflow.api.common.trigger_dag import trigger_dag
from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DagNotFound,
DagRunAlreadyExists
+from airflow.exceptions import AirflowException, DagNotFound,
DagRunAlreadyExists, RemovedInAirflow3Warning
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.dag import DagModel
@@ -41,7 +42,7 @@ from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
-XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
+XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso"
XCOM_RUN_ID = "trigger_run_id"
@@ -64,7 +65,7 @@ class TriggerDagRunLink(BaseOperatorLink):
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
# Fetch the correct execution date for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
- when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
+ when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
query = {"dag_id": cast(TriggerDagRunOperator,
operator).trigger_dag_id, "base_date": when}
return build_airflow_url_with_query(query)
@@ -77,7 +78,7 @@ class TriggerDagRunOperator(BaseOperator):
:param trigger_run_id: The run ID to use for the triggered DAG run
(templated).
If not provided, a run ID will be automatically generated.
:param conf: Configuration for the DAG run (templated).
- :param execution_date: Execution date for the dag (templated).
+ :param logical_date: Logical date for the dag (templated).
:param reset_dag_run: Whether clear existing dag run if already exists.
This is useful when backfill or rerun an existing dag run.
This only resets (not recreates) the dag run.
@@ -91,12 +92,13 @@ class TriggerDagRunOperator(BaseOperator):
:param failed_states: List of failed or dis-allowed states, default is
``None``.
:param deferrable: If waiting for completion, whether or not to defer the
task until done,
default is ``False``.
+ :param execution_date: Deprecated parameter; same as ``logical_date``.
"""
template_fields: Sequence[str] = (
"trigger_dag_id",
"trigger_run_id",
- "execution_date",
+ "logical_date",
"conf",
"wait_for_completion",
)
@@ -110,13 +112,14 @@ class TriggerDagRunOperator(BaseOperator):
trigger_dag_id: str,
trigger_run_id: str | None = None,
conf: dict | None = None,
- execution_date: str | datetime.datetime | None = None,
+ logical_date: str | datetime.datetime | None = None,
reset_dag_run: bool = False,
wait_for_completion: bool = False,
poke_interval: int = 60,
allowed_states: list[str] | None = None,
failed_states: list[str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ execution_date: str | datetime.datetime | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -136,20 +139,29 @@ class TriggerDagRunOperator(BaseOperator):
self.failed_states = [DagRunState.FAILED]
self._defer = deferrable
- if execution_date is not None and not isinstance(execution_date, (str,
datetime.datetime)):
+ if execution_date is not None:
+ warnings.warn(
+ "Parameter 'execution_date' is deprecated. Use 'logical_date'
instead.",
+ RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+ logical_date = execution_date
+
+ if logical_date is not None and not isinstance(logical_date, (str,
datetime.datetime)):
+ type_name = type(logical_date).__name__
raise TypeError(
- f"Expected str or datetime.datetime type for
execution_date.Got {type(execution_date)}"
+ f"Expected str or datetime.datetime type for parameter
'logical_date'. Got {type_name}"
)
- self.execution_date = execution_date
+ self.logical_date = logical_date
def execute(self, context: Context):
- if isinstance(self.execution_date, datetime.datetime):
- parsed_execution_date = self.execution_date
- elif isinstance(self.execution_date, str):
- parsed_execution_date = timezone.parse(self.execution_date)
+ if isinstance(self.logical_date, datetime.datetime):
+ parsed_logical_date = self.logical_date
+ elif isinstance(self.logical_date, str):
+ parsed_logical_date = timezone.parse(self.logical_date)
else:
- parsed_execution_date = timezone.utcnow()
+ parsed_logical_date = timezone.utcnow()
try:
json.dumps(self.conf)
@@ -159,20 +171,20 @@ class TriggerDagRunOperator(BaseOperator):
if self.trigger_run_id:
run_id = str(self.trigger_run_id)
else:
- run_id = DagRun.generate_run_id(DagRunType.MANUAL,
parsed_execution_date)
+ run_id = DagRun.generate_run_id(DagRunType.MANUAL,
parsed_logical_date)
try:
dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
run_id=run_id,
conf=self.conf,
- execution_date=parsed_execution_date,
+ execution_date=parsed_logical_date,
replace_microseconds=False,
)
except DagRunAlreadyExists as e:
if self.reset_dag_run:
- self.log.info("Clearing %s on %s", self.trigger_dag_id,
parsed_execution_date)
+ self.log.info("Clearing %s on %s", self.trigger_dag_id,
parsed_logical_date)
# Get target dag object and call clear()
dag_model = DagModel.get_current(self.trigger_dag_id)
@@ -182,7 +194,7 @@ class TriggerDagRunOperator(BaseOperator):
dag_bag = DagBag(dag_folder=dag_model.fileloc,
read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
dag_run = e.dag_run
- dag.clear(start_date=dag_run.execution_date,
end_date=dag_run.execution_date)
+ dag.clear(start_date=dag_run.logical_date,
end_date=dag_run.logical_date)
else:
raise e
if dag_run is None:
@@ -190,7 +202,7 @@ class TriggerDagRunOperator(BaseOperator):
# Store the execution date from the dag run (either created or found
above) to
# be used when creating the extra link on the webserver.
ti = context["task_instance"]
- ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO,
value=dag_run.execution_date.isoformat())
+ ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO,
value=dag_run.logical_date.isoformat())
ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
if self.wait_for_completion:
@@ -200,7 +212,7 @@ class TriggerDagRunOperator(BaseOperator):
trigger=DagStateTrigger(
dag_id=self.trigger_dag_id,
states=self.allowed_states + self.failed_states,
- execution_dates=[parsed_execution_date],
+ execution_dates=[parsed_logical_date],
poll_interval=self.poke_interval,
),
method_name="execute_complete",
@@ -210,7 +222,7 @@ class TriggerDagRunOperator(BaseOperator):
self.log.info(
"Waiting for %s on %s to become allowed state %s ...",
self.trigger_dag_id,
- dag_run.execution_date,
+ dag_run.logical_date,
self.allowed_states,
)
time.sleep(self.poke_interval)
@@ -225,17 +237,17 @@ class TriggerDagRunOperator(BaseOperator):
@provide_session
def execute_complete(self, context: Context, session: Session, event:
tuple[str, dict[str, Any]]):
- # This execution date is parsed from the return trigger event
- provided_execution_date = event[1]["execution_dates"][0]
+ # This logical_date is parsed from the return trigger event
+ provided_logical_date = event[1]["execution_dates"][0]
try:
dag_run = session.execute(
select(DagRun).where(
- DagRun.dag_id == self.trigger_dag_id,
DagRun.execution_date == provided_execution_date
+ DagRun.dag_id == self.trigger_dag_id,
DagRun.execution_date == provided_logical_date
)
).scalar_one()
except NoResultFound:
raise AirflowException(
- f"No DAG run found for DAG {self.trigger_dag_id} and execution
date {self.execution_date}"
+ f"No DAG run found for DAG {self.trigger_dag_id} and logical
date {self.logical_date}"
)
state = dag_run.state
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index a90f49926e..9eed9b786e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -110,7 +110,7 @@ class TestDagRunOperator:
args, _ = mock_build_url.call_args
expected_args = {
"dag_id": triggered_dag_run.dag_id,
- "base_date": triggered_dag_run.execution_date.isoformat(),
+ "base_date": triggered_dag_run.logical_date.isoformat(),
}
assert expected_args in args
@@ -122,7 +122,7 @@ class TestDagRunOperator:
with create_session() as session:
dagrun = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
- assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
dagrun.execution_date)
+ assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
dagrun.logical_date)
self.assert_extra_link(dagrun, task, session)
def test_trigger_dagrun_custom_run_id(self):
@@ -139,13 +139,13 @@ class TestDagRunOperator:
assert len(dagruns) == 1
assert dagruns[0].run_id == "custom_run_id"
- def test_trigger_dagrun_with_execution_date(self):
- """Test TriggerDagRunOperator with custom execution_date."""
- custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
+ def test_trigger_dagrun_with_logical_date(self):
+ """Test TriggerDagRunOperator with custom logical_date."""
+ custom_logical_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_execution_date",
+ task_id="test_trigger_dagrun_with_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=custom_execution_date,
+ logical_date=custom_logical_date,
dag=self.dag,
)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
@@ -153,17 +153,17 @@ class TestDagRunOperator:
with create_session() as session:
dagrun = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
- assert dagrun.execution_date == custom_execution_date
- assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
custom_execution_date)
+ assert dagrun.logical_date == custom_logical_date
+ assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
custom_logical_date)
self.assert_extra_link(dagrun, task, session)
def test_trigger_dagrun_twice(self):
- """Test TriggerDagRunOperator with custom execution_date."""
+ """Test TriggerDagRunOperator with custom logical_date."""
utc_now = timezone.utcnow()
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_execution_date",
+ task_id="test_trigger_dagrun_with_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=utc_now,
+ logical_date=utc_now,
dag=self.dag,
poke_interval=1,
reset_dag_run=True,
@@ -186,16 +186,16 @@ class TestDagRunOperator:
assert len(dagruns) == 1
triggered_dag_run = dagruns[0]
assert triggered_dag_run.external_trigger
- assert triggered_dag_run.execution_date == utc_now
+ assert triggered_dag_run.logical_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)
def test_trigger_dagrun_with_scheduled_dag_run(self):
- """Test TriggerDagRunOperator with custom execution_date and scheduled
dag_run."""
+ """Test TriggerDagRunOperator with custom logical_date and scheduled
dag_run."""
utc_now = timezone.utcnow()
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_execution_date",
+ task_id="test_trigger_dagrun_with_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=utc_now,
+ logical_date=utc_now,
dag=self.dag,
poke_interval=1,
reset_dag_run=True,
@@ -218,15 +218,15 @@ class TestDagRunOperator:
assert len(dagruns) == 1
triggered_dag_run = dagruns[0]
assert triggered_dag_run.external_trigger
- assert triggered_dag_run.execution_date == utc_now
+ assert triggered_dag_run.logical_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)
- def test_trigger_dagrun_with_templated_execution_date(self):
- """Test TriggerDagRunOperator with templated execution_date."""
+ def test_trigger_dagrun_with_templated_logical_date(self):
+ """Test TriggerDagRunOperator with templated logical_date."""
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_str_execution_date",
+ task_id="test_trigger_dagrun_with_str_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date="{{ logical_date }}",
+ logical_date="{{ logical_date }}",
dag=self.dag,
)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
@@ -236,13 +236,13 @@ class TestDagRunOperator:
assert len(dagruns) == 1
triggered_dag_run = dagruns[0]
assert triggered_dag_run.external_trigger
- assert triggered_dag_run.execution_date == DEFAULT_DATE
+ assert triggered_dag_run.logical_date == DEFAULT_DATE
self.assert_extra_link(triggered_dag_run, task, session)
def test_trigger_dagrun_operator_conf(self):
"""Test passing conf to the triggered DagRun."""
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_str_execution_date",
+ task_id="test_trigger_dagrun_with_str_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "bar"},
dag=self.dag,
@@ -268,7 +268,7 @@ class TestDagRunOperator:
def test_trigger_dagrun_operator_templated_conf(self):
"""Test passing a templated conf to the triggered DagRun."""
task = TriggerDagRunOperator(
- task_id="test_trigger_dagrun_with_str_execution_date",
+ task_id="test_trigger_dagrun_with_str_logical_date",
trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "{{ dag.dag_id }}"},
dag=self.dag,
@@ -282,48 +282,48 @@ class TestDagRunOperator:
def test_trigger_dagrun_with_reset_dag_run_false(self):
"""Test TriggerDagRunOperator without reset_dag_run."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id=None,
- execution_date=None,
+ logical_date=None,
reset_dag_run=False,
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
assert len(dagruns) == 2
@pytest.mark.parametrize(
- "trigger_run_id, trigger_execution_date",
+ "trigger_run_id, trigger_logical_date",
[
(None, DEFAULT_DATE),
("dummy_run_id", None),
("dummy_run_id", DEFAULT_DATE),
],
)
- def test_trigger_dagrun_with_reset_dag_run_false_fail(self,
trigger_run_id, trigger_execution_date):
+ def test_trigger_dagrun_with_reset_dag_run_false_fail(self,
trigger_run_id, trigger_logical_date):
"""Test TriggerDagRunOperator without reset_dag_run but triggered dag
fails."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id=trigger_run_id,
- execution_date=trigger_execution_date,
+ logical_date=trigger_logical_date,
reset_dag_run=False,
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
with pytest.raises(DagRunAlreadyExists):
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
@pytest.mark.parametrize(
- "trigger_run_id, trigger_execution_date, expected_dagruns_count",
+ "trigger_run_id, trigger_logical_date, expected_dagruns_count",
[
(None, DEFAULT_DATE, 1),
(None, None, 2),
@@ -332,20 +332,20 @@ class TestDagRunOperator:
],
)
def test_trigger_dagrun_with_reset_dag_run_true(
- self, trigger_run_id, trigger_execution_date, expected_dagruns_count
+ self, trigger_run_id, trigger_logical_date, expected_dagruns_count
):
"""Test TriggerDagRunOperator with reset_dag_run."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id=trigger_run_id,
- execution_date=trigger_execution_date,
+ logical_date=trigger_logical_date,
reset_dag_run=True,
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
- task.run(start_date=execution_date, end_date=execution_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
+ task.run(start_date=logical_date, end_date=logical_date,
ignore_ti_state=True)
with create_session() as session:
dag_runs = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -354,17 +354,17 @@ class TestDagRunOperator:
def test_trigger_dagrun_with_wait_for_completion_true(self):
"""Test TriggerDagRunOperator with wait_for_completion."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.QUEUED],
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -372,28 +372,28 @@ class TestDagRunOperator:
def test_trigger_dagrun_with_wait_for_completion_true_fail(self):
"""Test TriggerDagRunOperator with wait_for_completion but triggered
dag fails."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
failed_states=[State.QUEUED],
dag=self.dag,
)
with pytest.raises(AirflowException):
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
def test_trigger_dagrun_triggering_itself(self):
"""Test TriggerDagRunOperator that triggers itself"""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = (
@@ -407,33 +407,33 @@ class TestDagRunOperator:
assert triggered_dag_run.state == State.QUEUED
self.assert_extra_link(triggered_dag_run, task, session)
- def test_trigger_dagrun_triggering_itself_with_execution_date(self):
- """Test TriggerDagRunOperator that triggers itself with execution date,
+ def test_trigger_dagrun_triggering_itself_with_logical_date(self):
+ """Test TriggerDagRunOperator that triggers itself with logical date,
fails with DagRunAlreadyExists"""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=self.dag.dag_id,
- execution_date=execution_date,
+ logical_date=logical_date,
dag=self.dag,
)
with pytest.raises(DagRunAlreadyExists):
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self):
"""Test TriggerDagRunOperator with wait_for_completion."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.QUEUED],
deferrable=False,
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -441,11 +441,11 @@ class TestDagRunOperator:
def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self):
"""Test TriggerDagRunOperator with wait_for_completion."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.QUEUED],
@@ -453,7 +453,7 @@ class TestDagRunOperator:
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -469,11 +469,11 @@ class TestDagRunOperator:
def
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self):
"""Test TriggerDagRunOperator wait_for_completion dag run in non
defined state."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.SUCCESS],
@@ -481,7 +481,7 @@ class TestDagRunOperator:
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -501,11 +501,11 @@ class TestDagRunOperator:
def
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self):
"""Test TriggerDagRunOperator wait_for_completion dag run in failed
state."""
- execution_date = DEFAULT_DATE
+ logical_date = DEFAULT_DATE
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
- execution_date=execution_date,
+ logical_date=logical_date,
wait_for_completion=True,
poke_interval=10,
allowed_states=[State.SUCCESS],
@@ -514,7 +514,7 @@ class TestDagRunOperator:
dag=self.dag,
)
- task.run(start_date=execution_date, end_date=execution_date)
+ task.run(start_date=logical_date, end_date=logical_date)
with create_session() as session:
dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
@@ -529,3 +529,21 @@ class TestDagRunOperator:
with pytest.raises(AirflowException, match="failed with failed state"):
task.execute_complete(context={}, event=trigger.serialize())
+
+ def test_trigger_dagrun_with_execution_date(self):
+ """Test TriggerDagRunOperator with custom execution_date (deprecated
parameter)"""
+ custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
+ task = TriggerDagRunOperator(
+ task_id="test_trigger_dagrun_with_execution_date",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ execution_date=custom_execution_date,
+ dag=self.dag,
+ )
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+
+ with create_session() as session:
+ dagrun = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
+ assert dagrun.external_trigger
+ assert dagrun.logical_date == custom_execution_date
+ assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
custom_execution_date)
+ self.assert_extra_link(dagrun, task, session)