BasPH commented on a change in pull request #6317: [AIRFLOW-5644] Simplify 
TriggerDagRunOperator usage
URL: https://github.com/apache/airflow/pull/6317#discussion_r337730794
 
 

 ##########
 File path: airflow/operators/dagrun_operator.py
 ##########
 @@ -18,81 +18,64 @@
 # under the License.
 
 import datetime
-import json
-from typing import Callable, Dict, Optional, Union
+from typing import Dict, Optional, Union
 
 from airflow.api.common.experimental.trigger_dag import trigger_dag
 from airflow.models import BaseOperator
 from airflow.utils import timezone
 from airflow.utils.decorators import apply_defaults
 
 
-class DagRunOrder:
-    def __init__(self, run_id=None, payload=None):
-        self.run_id = run_id
-        self.payload = payload
-
-
 class TriggerDagRunOperator(BaseOperator):
     """
     Triggers a DAG run for a specified ``dag_id``
 
     :param trigger_dag_id: the dag_id to trigger (templated)
     :type trigger_dag_id: str
-    :param python_callable: a reference to a python function that will be
-        called while passing it the ``context`` object and a placeholder
-        object ``obj`` for your callable to fill and return if you want
-        a DagRun created. This ``obj`` object contains a ``run_id`` and
-        ``payload`` attribute that you can modify in your function.
-        The ``run_id`` should be a unique identifier for that DAG run, and
-        the payload has to be a picklable object that will be made available
-        to your tasks while executing that DAG run. Your function header
-        should look like ``def foo(context, dag_run_obj):``
-    :type python_callable: python callable
+    :param conf: Configuration for the DAG run
+    :type conf: dict
     :param execution_date: Execution date for the dag (templated)
     :type execution_date: str or datetime.datetime
     """
-    template_fields = ('trigger_dag_id', 'execution_date')
-    ui_color = '#ffefeb'
+
+    template_fields = ("trigger_dag_id", "execution_date", "conf")
+    ui_color = "#ffefeb"
 
     @apply_defaults
     def __init__(
-            self,
-            trigger_dag_id: str,
-            python_callable: Optional[Callable[[Dict, DagRunOrder], 
DagRunOrder]] = None,
-            execution_date: Optional[Union[str, datetime.datetime]] = None,
-            *args, **kwargs) -> None:
+        self,
+        trigger_dag_id: str,
+        conf: Optional[Dict] = None,
+        execution_date: Optional[Union[str, datetime.datetime]] = None,
+        *args,
+        **kwargs
+    ) -> None:
         super().__init__(*args, **kwargs)
-        self.python_callable = python_callable
         self.trigger_dag_id = trigger_dag_id
+        self.conf = conf
 
-        self.execution_date = None  # type: Optional[Union[str, 
datetime.datetime]]
-        if isinstance(execution_date, datetime.datetime):
-            self.execution_date = execution_date.isoformat()
-        elif isinstance(execution_date, str):
+        if execution_date is None or isinstance(execution_date, (str, 
datetime.datetime)):
             self.execution_date = execution_date
-        elif execution_date is None:
-            self.execution_date = None
         else:
             raise TypeError(
-                'Expected str or datetime.datetime type '
-                'for execution_date. Got {}'.format(
-                    type(execution_date)))
+                "Expected str or datetime.datetime type for execution_date. "
+                "Got {}".format(type(execution_date))
+            )
 
-    def execute(self, context):
-        if self.execution_date is not None:
-            run_id = 'trig__{}'.format(self.execution_date)
-            self.execution_date = timezone.parse(self.execution_date)
+    def execute(self, context: Dict):
+        if isinstance(self.execution_date, datetime.datetime):
+            run_id = "trig__{}".format(self.execution_date.isoformat())
+        elif isinstance(self.execution_date, str):
+            run_id = "trig__{}".format(self.execution_date)
+            self.execution_date = timezone.parse(self.execution_date)  # 
trigger_dag() expects datetime
 
 Review comment:
   Would like to use it as it is much shorter and simpler.
   
   However, the `execution_date` can be templated. For example `{{ 
execution_date }}` will fail in `timezone.parse()`. So, we have to save it 
first, wait for `execute()` to be called and all variables to be templated, and 
only then can we call `timezone.parse()` on the `execution_date` :( 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to