This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c946fc3f0b `TriggerDagRunOperator` depreacte `exection_date` in favor 
of `logical_date` (#39285)
c946fc3f0b is described below

commit c946fc3f0b4c55bd6fbf9a49950d6e24980b4abe
Author: Felipe Lolas <[email protected]>
AuthorDate: Sat Apr 27 11:46:54 2024 -0400

    `TriggerDagRunOperator` depreacte `exection_date` in favor of 
`logical_date` (#39285)
    
    * added logical_date parameter
    
    * fix comment
---
 airflow/operators/trigger_dagrun.py    |  62 ++++++++------
 tests/operators/test_trigger_dagrun.py | 148 ++++++++++++++++++---------------
 2 files changed, 120 insertions(+), 90 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index ab74d4c862..f8cfa5256a 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import datetime
 import json
 import time
+import warnings
 from typing import TYPE_CHECKING, Any, Sequence, cast
 
 from sqlalchemy import select
@@ -27,7 +28,7 @@ from sqlalchemy.orm.exc import NoResultFound
 
 from airflow.api.common.trigger_dag import trigger_dag
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DagNotFound, 
DagRunAlreadyExists
+from airflow.exceptions import AirflowException, DagNotFound, 
DagRunAlreadyExists, RemovedInAirflow3Warning
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.baseoperatorlink import BaseOperatorLink
 from airflow.models.dag import DagModel
@@ -41,7 +42,7 @@ from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
-XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
+XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso"
 XCOM_RUN_ID = "trigger_run_id"
 
 
@@ -64,7 +65,7 @@ class TriggerDagRunLink(BaseOperatorLink):
     def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
         # Fetch the correct execution date for the triggerED dag which is
         # stored in xcom during execution of the triggerING task.
-        when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
+        when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
         query = {"dag_id": cast(TriggerDagRunOperator, 
operator).trigger_dag_id, "base_date": when}
         return build_airflow_url_with_query(query)
 
@@ -77,7 +78,7 @@ class TriggerDagRunOperator(BaseOperator):
     :param trigger_run_id: The run ID to use for the triggered DAG run 
(templated).
         If not provided, a run ID will be automatically generated.
     :param conf: Configuration for the DAG run (templated).
-    :param execution_date: Execution date for the dag (templated).
+    :param logical_date: Logical date for the dag (templated).
     :param reset_dag_run: Whether clear existing dag run if already exists.
         This is useful when backfill or rerun an existing dag run.
         This only resets (not recreates) the dag run.
@@ -91,12 +92,13 @@ class TriggerDagRunOperator(BaseOperator):
     :param failed_states: List of failed or dis-allowed states, default is 
``None``.
     :param deferrable: If waiting for completion, whether or not to defer the 
task until done,
         default is ``False``.
+    :param execution_date: Deprecated parameter; same as ``logical_date``.
     """
 
     template_fields: Sequence[str] = (
         "trigger_dag_id",
         "trigger_run_id",
-        "execution_date",
+        "logical_date",
         "conf",
         "wait_for_completion",
     )
@@ -110,13 +112,14 @@ class TriggerDagRunOperator(BaseOperator):
         trigger_dag_id: str,
         trigger_run_id: str | None = None,
         conf: dict | None = None,
-        execution_date: str | datetime.datetime | None = None,
+        logical_date: str | datetime.datetime | None = None,
         reset_dag_run: bool = False,
         wait_for_completion: bool = False,
         poke_interval: int = 60,
         allowed_states: list[str] | None = None,
         failed_states: list[str] | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        execution_date: str | datetime.datetime | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -136,20 +139,29 @@ class TriggerDagRunOperator(BaseOperator):
             self.failed_states = [DagRunState.FAILED]
         self._defer = deferrable
 
-        if execution_date is not None and not isinstance(execution_date, (str, 
datetime.datetime)):
+        if execution_date is not None:
+            warnings.warn(
+                "Parameter 'execution_date' is deprecated. Use 'logical_date' 
instead.",
+                RemovedInAirflow3Warning,
+                stacklevel=2,
+            )
+            logical_date = execution_date
+
+        if logical_date is not None and not isinstance(logical_date, (str, 
datetime.datetime)):
+            type_name = type(logical_date).__name__
             raise TypeError(
-                f"Expected str or datetime.datetime type for 
execution_date.Got {type(execution_date)}"
+                f"Expected str or datetime.datetime type for parameter 
'logical_date'. Got {type_name}"
             )
 
-        self.execution_date = execution_date
+        self.logical_date = logical_date
 
     def execute(self, context: Context):
-        if isinstance(self.execution_date, datetime.datetime):
-            parsed_execution_date = self.execution_date
-        elif isinstance(self.execution_date, str):
-            parsed_execution_date = timezone.parse(self.execution_date)
+        if isinstance(self.logical_date, datetime.datetime):
+            parsed_logical_date = self.logical_date
+        elif isinstance(self.logical_date, str):
+            parsed_logical_date = timezone.parse(self.logical_date)
         else:
-            parsed_execution_date = timezone.utcnow()
+            parsed_logical_date = timezone.utcnow()
 
         try:
             json.dumps(self.conf)
@@ -159,20 +171,20 @@ class TriggerDagRunOperator(BaseOperator):
         if self.trigger_run_id:
             run_id = str(self.trigger_run_id)
         else:
-            run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
parsed_execution_date)
+            run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
parsed_logical_date)
 
         try:
             dag_run = trigger_dag(
                 dag_id=self.trigger_dag_id,
                 run_id=run_id,
                 conf=self.conf,
-                execution_date=parsed_execution_date,
+                execution_date=parsed_logical_date,
                 replace_microseconds=False,
             )
 
         except DagRunAlreadyExists as e:
             if self.reset_dag_run:
-                self.log.info("Clearing %s on %s", self.trigger_dag_id, 
parsed_execution_date)
+                self.log.info("Clearing %s on %s", self.trigger_dag_id, 
parsed_logical_date)
 
                 # Get target dag object and call clear()
                 dag_model = DagModel.get_current(self.trigger_dag_id)
@@ -182,7 +194,7 @@ class TriggerDagRunOperator(BaseOperator):
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, 
read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
                 dag_run = e.dag_run
-                dag.clear(start_date=dag_run.execution_date, 
end_date=dag_run.execution_date)
+                dag.clear(start_date=dag_run.logical_date, 
end_date=dag_run.logical_date)
             else:
                 raise e
         if dag_run is None:
@@ -190,7 +202,7 @@ class TriggerDagRunOperator(BaseOperator):
         # Store the execution date from the dag run (either created or found 
above) to
         # be used when creating the extra link on the webserver.
         ti = context["task_instance"]
-        ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, 
value=dag_run.execution_date.isoformat())
+        ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, 
value=dag_run.logical_date.isoformat())
         ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
 
         if self.wait_for_completion:
@@ -200,7 +212,7 @@ class TriggerDagRunOperator(BaseOperator):
                     trigger=DagStateTrigger(
                         dag_id=self.trigger_dag_id,
                         states=self.allowed_states + self.failed_states,
-                        execution_dates=[parsed_execution_date],
+                        execution_dates=[parsed_logical_date],
                         poll_interval=self.poke_interval,
                     ),
                     method_name="execute_complete",
@@ -210,7 +222,7 @@ class TriggerDagRunOperator(BaseOperator):
                 self.log.info(
                     "Waiting for %s on %s to become allowed state %s ...",
                     self.trigger_dag_id,
-                    dag_run.execution_date,
+                    dag_run.logical_date,
                     self.allowed_states,
                 )
                 time.sleep(self.poke_interval)
@@ -225,17 +237,17 @@ class TriggerDagRunOperator(BaseOperator):
 
     @provide_session
     def execute_complete(self, context: Context, session: Session, event: 
tuple[str, dict[str, Any]]):
-        # This execution date is parsed from the return trigger event
-        provided_execution_date = event[1]["execution_dates"][0]
+        # This logical_date is parsed from the return trigger event
+        provided_logical_date = event[1]["execution_dates"][0]
         try:
             dag_run = session.execute(
                 select(DagRun).where(
-                    DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == provided_execution_date
+                    DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == provided_logical_date
                 )
             ).scalar_one()
         except NoResultFound:
             raise AirflowException(
-                f"No DAG run found for DAG {self.trigger_dag_id} and execution 
date {self.execution_date}"
+                f"No DAG run found for DAG {self.trigger_dag_id} and logical 
date {self.logical_date}"
             )
 
         state = dag_run.state
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index a90f49926e..9eed9b786e 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -110,7 +110,7 @@ class TestDagRunOperator:
         args, _ = mock_build_url.call_args
         expected_args = {
             "dag_id": triggered_dag_run.dag_id,
-            "base_date": triggered_dag_run.execution_date.isoformat(),
+            "base_date": triggered_dag_run.logical_date.isoformat(),
         }
         assert expected_args in args
 
@@ -122,7 +122,7 @@ class TestDagRunOperator:
         with create_session() as session:
             dagrun = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
             assert dagrun.external_trigger
-            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.execution_date)
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.logical_date)
             self.assert_extra_link(dagrun, task, session)
 
     def test_trigger_dagrun_custom_run_id(self):
@@ -139,13 +139,13 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             assert dagruns[0].run_id == "custom_run_id"
 
-    def test_trigger_dagrun_with_execution_date(self):
-        """Test TriggerDagRunOperator with custom execution_date."""
-        custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
+    def test_trigger_dagrun_with_logical_date(self):
+        """Test TriggerDagRunOperator with custom logical_date."""
+        custom_logical_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_execution_date",
+            task_id="test_trigger_dagrun_with_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=custom_execution_date,
+            logical_date=custom_logical_date,
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
@@ -153,17 +153,17 @@ class TestDagRunOperator:
         with create_session() as session:
             dagrun = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
             assert dagrun.external_trigger
-            assert dagrun.execution_date == custom_execution_date
-            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_execution_date)
+            assert dagrun.logical_date == custom_logical_date
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_logical_date)
             self.assert_extra_link(dagrun, task, session)
 
     def test_trigger_dagrun_twice(self):
-        """Test TriggerDagRunOperator with custom execution_date."""
+        """Test TriggerDagRunOperator with custom logical_date."""
         utc_now = timezone.utcnow()
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_execution_date",
+            task_id="test_trigger_dagrun_with_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=utc_now,
+            logical_date=utc_now,
             dag=self.dag,
             poke_interval=1,
             reset_dag_run=True,
@@ -186,16 +186,16 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             triggered_dag_run = dagruns[0]
             assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == utc_now
+            assert triggered_dag_run.logical_date == utc_now
             self.assert_extra_link(triggered_dag_run, task, session)
 
     def test_trigger_dagrun_with_scheduled_dag_run(self):
-        """Test TriggerDagRunOperator with custom execution_date and scheduled 
dag_run."""
+        """Test TriggerDagRunOperator with custom logical_date and scheduled 
dag_run."""
         utc_now = timezone.utcnow()
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_execution_date",
+            task_id="test_trigger_dagrun_with_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=utc_now,
+            logical_date=utc_now,
             dag=self.dag,
             poke_interval=1,
             reset_dag_run=True,
@@ -218,15 +218,15 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             triggered_dag_run = dagruns[0]
             assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == utc_now
+            assert triggered_dag_run.logical_date == utc_now
             self.assert_extra_link(triggered_dag_run, task, session)
 
-    def test_trigger_dagrun_with_templated_execution_date(self):
-        """Test TriggerDagRunOperator with templated execution_date."""
+    def test_trigger_dagrun_with_templated_logical_date(self):
+        """Test TriggerDagRunOperator with templated logical_date."""
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_execution_date",
+            task_id="test_trigger_dagrun_with_str_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date="{{ logical_date }}",
+            logical_date="{{ logical_date }}",
             dag=self.dag,
         )
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
@@ -236,13 +236,13 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             triggered_dag_run = dagruns[0]
             assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.execution_date == DEFAULT_DATE
+            assert triggered_dag_run.logical_date == DEFAULT_DATE
             self.assert_extra_link(triggered_dag_run, task, session)
 
     def test_trigger_dagrun_operator_conf(self):
         """Test passing conf to the triggered DagRun."""
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_execution_date",
+            task_id="test_trigger_dagrun_with_str_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
             conf={"foo": "bar"},
             dag=self.dag,
@@ -268,7 +268,7 @@ class TestDagRunOperator:
     def test_trigger_dagrun_operator_templated_conf(self):
         """Test passing a templated conf to the triggered DagRun."""
         task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_execution_date",
+            task_id="test_trigger_dagrun_with_str_logical_date",
             trigger_dag_id=TRIGGERED_DAG_ID,
             conf={"foo": "{{ dag.dag_id }}"},
             dag=self.dag,
@@ -282,48 +282,48 @@ class TestDagRunOperator:
 
     def test_trigger_dagrun_with_reset_dag_run_false(self):
         """Test TriggerDagRunOperator without reset_dag_run."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
             trigger_run_id=None,
-            execution_date=None,
+            logical_date=None,
             reset_dag_run=False,
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
-        task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
+        task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
+        task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 2
 
     @pytest.mark.parametrize(
-        "trigger_run_id, trigger_execution_date",
+        "trigger_run_id, trigger_logical_date",
         [
             (None, DEFAULT_DATE),
             ("dummy_run_id", None),
             ("dummy_run_id", DEFAULT_DATE),
         ],
     )
-    def test_trigger_dagrun_with_reset_dag_run_false_fail(self, 
trigger_run_id, trigger_execution_date):
+    def test_trigger_dagrun_with_reset_dag_run_false_fail(self, 
trigger_run_id, trigger_logical_date):
         """Test TriggerDagRunOperator without reset_dag_run but triggered dag 
fails."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
             trigger_run_id=trigger_run_id,
-            execution_date=trigger_execution_date,
+            logical_date=trigger_logical_date,
             reset_dag_run=False,
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
+        task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
         with pytest.raises(DagRunAlreadyExists):
-            task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
+            task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
     @pytest.mark.parametrize(
-        "trigger_run_id, trigger_execution_date, expected_dagruns_count",
+        "trigger_run_id, trigger_logical_date, expected_dagruns_count",
         [
             (None, DEFAULT_DATE, 1),
             (None, None, 2),
@@ -332,20 +332,20 @@ class TestDagRunOperator:
         ],
     )
     def test_trigger_dagrun_with_reset_dag_run_true(
-        self, trigger_run_id, trigger_execution_date, expected_dagruns_count
+        self, trigger_run_id, trigger_logical_date, expected_dagruns_count
     ):
         """Test TriggerDagRunOperator with reset_dag_run."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
             trigger_run_id=trigger_run_id,
-            execution_date=trigger_execution_date,
+            logical_date=trigger_logical_date,
             reset_dag_run=True,
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
-        task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
+        task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
+        task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
         with create_session() as session:
             dag_runs = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -354,17 +354,17 @@ class TestDagRunOperator:
 
     def test_trigger_dagrun_with_wait_for_completion_true(self):
         """Test TriggerDagRunOperator with wait_for_completion."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             allowed_states=[State.QUEUED],
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -372,28 +372,28 @@ class TestDagRunOperator:
 
     def test_trigger_dagrun_with_wait_for_completion_true_fail(self):
         """Test TriggerDagRunOperator with wait_for_completion but triggered 
dag fails."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             failed_states=[State.QUEUED],
             dag=self.dag,
         )
         with pytest.raises(AirflowException):
-            task.run(start_date=execution_date, end_date=execution_date)
+            task.run(start_date=logical_date, end_date=logical_date)
 
     def test_trigger_dagrun_triggering_itself(self):
         """Test TriggerDagRunOperator that triggers itself"""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=self.dag.dag_id,
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = (
@@ -407,33 +407,33 @@ class TestDagRunOperator:
             assert triggered_dag_run.state == State.QUEUED
             self.assert_extra_link(triggered_dag_run, task, session)
 
-    def test_trigger_dagrun_triggering_itself_with_execution_date(self):
-        """Test TriggerDagRunOperator that triggers itself with execution date,
+    def test_trigger_dagrun_triggering_itself_with_logical_date(self):
+        """Test TriggerDagRunOperator that triggers itself with logical date,
         fails with DagRunAlreadyExists"""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=self.dag.dag_id,
-            execution_date=execution_date,
+            logical_date=logical_date,
             dag=self.dag,
         )
         with pytest.raises(DagRunAlreadyExists):
-            task.run(start_date=execution_date, end_date=execution_date)
+            task.run(start_date=logical_date, end_date=logical_date)
 
     def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self):
         """Test TriggerDagRunOperator with wait_for_completion."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             allowed_states=[State.QUEUED],
             deferrable=False,
             dag=self.dag,
         )
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -441,11 +441,11 @@ class TestDagRunOperator:
 
     def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self):
         """Test TriggerDagRunOperator with wait_for_completion."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             allowed_states=[State.QUEUED],
@@ -453,7 +453,7 @@ class TestDagRunOperator:
             dag=self.dag,
         )
 
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -469,11 +469,11 @@ class TestDagRunOperator:
 
     def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self):
         """Test TriggerDagRunOperator wait_for_completion dag run in non 
defined state."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             allowed_states=[State.SUCCESS],
@@ -481,7 +481,7 @@ class TestDagRunOperator:
             dag=self.dag,
         )
 
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -501,11 +501,11 @@ class TestDagRunOperator:
 
     def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self):
         """Test TriggerDagRunOperator  wait_for_completion dag run in failed 
state."""
-        execution_date = DEFAULT_DATE
+        logical_date = DEFAULT_DATE
         task = TriggerDagRunOperator(
             task_id="test_task",
             trigger_dag_id=TRIGGERED_DAG_ID,
-            execution_date=execution_date,
+            logical_date=logical_date,
             wait_for_completion=True,
             poke_interval=10,
             allowed_states=[State.SUCCESS],
@@ -514,7 +514,7 @@ class TestDagRunOperator:
             dag=self.dag,
         )
 
-        task.run(start_date=execution_date, end_date=execution_date)
+        task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
@@ -529,3 +529,21 @@ class TestDagRunOperator:
 
         with pytest.raises(AirflowException, match="failed with failed state"):
             task.execute_complete(context={}, event=trigger.serialize())
+
+    def test_trigger_dagrun_with_execution_date(self):
+        """Test TriggerDagRunOperator with custom execution_date (deprecated 
parameter)"""
+        custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
+        task = TriggerDagRunOperator(
+            task_id="test_trigger_dagrun_with_execution_date",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            execution_date=custom_execution_date,
+            dag=self.dag,
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+        with create_session() as session:
+            dagrun = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
+            assert dagrun.external_trigger
+            assert dagrun.logical_date == custom_execution_date
+            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_execution_date)
+            self.assert_extra_link(dagrun, task, session)

Reply via email to