BasPH commented on a change in pull request #6317: [AIRFLOW-5644] Simplify
TriggerDagRunOperator usage
URL: https://github.com/apache/airflow/pull/6317#discussion_r337683917
##########
File path: airflow/operators/dagrun_operator.py
##########
@@ -18,81 +18,64 @@
# under the License.
import datetime
-import json
-from typing import Callable, Dict, Optional, Union
+from typing import Dict, Optional, Union
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
-class DagRunOrder:
- def __init__(self, run_id=None, payload=None):
- self.run_id = run_id
- self.payload = payload
-
-
class TriggerDagRunOperator(BaseOperator):
"""
Triggers a DAG run for a specified ``dag_id``
:param trigger_dag_id: the dag_id to trigger (templated)
:type trigger_dag_id: str
- :param python_callable: a reference to a python function that will be
- called while passing it the ``context`` object and a placeholder
- object ``obj`` for your callable to fill and return if you want
- a DagRun created. This ``obj`` object contains a ``run_id`` and
- ``payload`` attribute that you can modify in your function.
- The ``run_id`` should be a unique identifier for that DAG run, and
- the payload has to be a picklable object that will be made available
- to your tasks while executing that DAG run. Your function header
- should look like ``def foo(context, dag_run_obj):``
- :type python_callable: python callable
+ :param conf: Configuration for the DAG run
+ :type conf: dict
:param execution_date: Execution date for the dag (templated)
:type execution_date: str or datetime.datetime
"""
- template_fields = ('trigger_dag_id', 'execution_date')
- ui_color = '#ffefeb'
+
+ template_fields = ("trigger_dag_id", "execution_date", "conf")
+ ui_color = "#ffefeb"
@apply_defaults
def __init__(
- self,
- trigger_dag_id: str,
- python_callable: Optional[Callable[[Dict, DagRunOrder],
DagRunOrder]] = None,
- execution_date: Optional[Union[str, datetime.datetime]] = None,
- *args, **kwargs) -> None:
+ self,
+ trigger_dag_id: str,
+ conf: Optional[Dict] = None,
+ execution_date: Optional[Union[str, datetime.datetime]] = None,
+ *args,
+ **kwargs
+ ) -> None:
super().__init__(*args, **kwargs)
- self.python_callable = python_callable
self.trigger_dag_id = trigger_dag_id
+ self.conf = conf
- self.execution_date = None # type: Optional[Union[str,
datetime.datetime]]
- if isinstance(execution_date, datetime.datetime):
- self.execution_date = execution_date.isoformat()
- elif isinstance(execution_date, str):
+ if execution_date is None or isinstance(execution_date, (str,
datetime.datetime)):
self.execution_date = execution_date
- elif execution_date is None:
- self.execution_date = None
else:
raise TypeError(
- 'Expected str or datetime.datetime type '
- 'for execution_date. Got {}'.format(
- type(execution_date)))
+ "Expected str or datetime.datetime type for execution_date. "
+ "Got {}".format(type(execution_date))
+ )
- def execute(self, context):
- if self.execution_date is not None:
- run_id = 'trig__{}'.format(self.execution_date)
- self.execution_date = timezone.parse(self.execution_date)
+ def execute(self, context: Dict):
+ if isinstance(self.execution_date, datetime.datetime):
+ run_id = "trig__{}".format(self.execution_date.isoformat())
+ elif isinstance(self.execution_date, str):
+ run_id = "trig__{}".format(self.execution_date)
+ self.execution_date = timezone.parse(self.execution_date) #
trigger_dag() expects datetime
Review comment:
Much simpler, much better👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services