BasPH commented on a change in pull request #6317: [AIRFLOW-5644] Simplify TriggerDagRunOperator usage URL: https://github.com/apache/airflow/pull/6317#discussion_r337730794
########## File path: airflow/operators/dagrun_operator.py ########## @@ -18,81 +18,64 @@ # under the License. import datetime -import json -from typing import Callable, Dict, Optional, Union +from typing import Dict, Optional, Union from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.models import BaseOperator from airflow.utils import timezone from airflow.utils.decorators import apply_defaults -class DagRunOrder: - def __init__(self, run_id=None, payload=None): - self.run_id = run_id - self.payload = payload - - class TriggerDagRunOperator(BaseOperator): """ Triggers a DAG run for a specified ``dag_id`` :param trigger_dag_id: the dag_id to trigger (templated) :type trigger_dag_id: str - :param python_callable: a reference to a python function that will be - called while passing it the ``context`` object and a placeholder - object ``obj`` for your callable to fill and return if you want - a DagRun created. This ``obj`` object contains a ``run_id`` and - ``payload`` attribute that you can modify in your function. - The ``run_id`` should be a unique identifier for that DAG run, and - the payload has to be a picklable object that will be made available - to your tasks while executing that DAG run. Your function header - should look like ``def foo(context, dag_run_obj):`` - :type python_callable: python callable + :param conf: Configuration for the DAG run + :type conf: dict :param execution_date: Execution date for the dag (templated) :type execution_date: str or datetime.datetime """ - template_fields = ('trigger_dag_id', 'execution_date') - ui_color = '#ffefeb' + + template_fields = ("trigger_dag_id", "execution_date", "conf") + ui_color = "#ffefeb" @apply_defaults def __init__( - self, - trigger_dag_id: str, - python_callable: Optional[Callable[[Dict, DagRunOrder], DagRunOrder]] = None, - execution_date: Optional[Union[str, datetime.datetime]] = None, - *args, **kwargs) -> None: + self, + trigger_dag_id: str, + conf: Optional[Dict] = None, + execution_date: Optional[Union[str, datetime.datetime]] = None, + *args, + **kwargs + ) -> None: super().__init__(*args, **kwargs) - self.python_callable = python_callable self.trigger_dag_id = trigger_dag_id + self.conf = conf - self.execution_date = None # type: Optional[Union[str, datetime.datetime]] - if isinstance(execution_date, datetime.datetime): - self.execution_date = execution_date.isoformat() - elif isinstance(execution_date, str): + if execution_date is None or isinstance(execution_date, (str, datetime.datetime)): self.execution_date = execution_date - elif execution_date is None: - self.execution_date = None else: raise TypeError( - 'Expected str or datetime.datetime type ' - 'for execution_date. Got {}'.format( - type(execution_date))) + "Expected str or datetime.datetime type for execution_date. " + "Got {}".format(type(execution_date)) + ) - def execute(self, context): - if self.execution_date is not None: - run_id = 'trig__{}'.format(self.execution_date) - self.execution_date = timezone.parse(self.execution_date) + def execute(self, context: Dict): + if isinstance(self.execution_date, datetime.datetime): + run_id = "trig__{}".format(self.execution_date.isoformat()) + elif isinstance(self.execution_date, str): + run_id = "trig__{}".format(self.execution_date) + self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime Review comment: Would like to use it as it is much shorter and simpler. However, the `execution_date` can be templated. For example `{{ execution_date }}` will fail in `timezone.parse()`. So, we have to save it first, wait for `execute()` to be called and all variables to be templated, and only then can we call `timezone.parse()` on the `execution_date` :( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services