anxodio closed pull request #2238: [AIRFLOW-957] Add execution_date parameter
to TriggerDagRunOperator
URL: https://github.com/apache/incubator-airflow/pull/2238
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/operators/dagrun_operator.py
b/airflow/operators/dagrun_operator.py
index c3ffa1ada7..7094c50071 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from datetime import datetime
import logging
from airflow.models import BaseOperator, DagBag
@@ -23,9 +22,27 @@
class DagRunOrder(object):
- def __init__(self, run_id=None, payload=None):
- self.run_id = run_id
+ def __init__(self, execution_date, run_id=None, payload=None):
+ self._run_id = run_id
self.payload = payload
+ self.execution_date = execution_date
+
+ @property
+ def run_id(self):
+ return self._run_id or self._auto_run_id
+
+ @run_id.setter
+ def run_id(self, value):
+ self._run_id = value
+
+ @property
+ def execution_date(self):
+ return self._execution_date
+
+ @execution_date.setter
+ def execution_date(self, dt):
+ self._execution_date = dt
+ self._auto_run_id = 'trig__%s' % dt.isoformat()
class TriggerDagRunOperator(BaseOperator):
@@ -37,8 +54,11 @@ class TriggerDagRunOperator(BaseOperator):
:param python_callable: a reference to a python function that will be
called while passing it the ``context`` object and a placeholder
object ``obj`` for your callable to fill and return if you want
- a DagRun created. This ``obj`` object contains a ``run_id`` and
- ``payload`` attribute that you can modify in your function.
+ a DagRun created. This ``obj`` object contains an
+ ``execution_date``, a ``run_id`` and ``payload`` attribute that
+ you can modify in your function.
+ The ``execution_date`` is by default the current
+ task's instance ``execution_date``.
The ``run_id`` should be a unique identifier for that DAG run, and
the payload has to be a picklable object that will be made available
to your tasks while executing that DAG run. Your function header
@@ -60,7 +80,7 @@ def __init__(
self.trigger_dag_id = trigger_dag_id
def execute(self, context):
- dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())
+ dro = DagRunOrder(context['execution_date'])
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
@@ -70,6 +90,7 @@ def execute(self, context):
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
+ execution_date=dro.execution_date,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
diff --git a/tests/core.py b/tests/core.py
index 353b847c6b..3c6e841bb9 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -446,6 +446,7 @@ def test_bash_operator_kill(self):
def test_trigger_dagrun(self):
def trigga(context, obj):
+ trigga.run_id = obj.run_id
if True:
return obj
@@ -456,6 +457,40 @@ def trigga(context, obj):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+ session = settings.Session()
+ new_dag_run = session.query(models.DagRun).filter(
+ models.DagRun.run_id == trigga.run_id).first()
+ self.assertEqual(new_dag_run.execution_date, DEFAULT_DATE)
+
+ def test_trigger_dagrun_order_modified(self):
+ """
+ Test TriggerDagRunOperator with changes in DagRunOrder
+ """
+ new_execution_date = datetime(2016, 1, 1)
+ new_dag_run_id = 'manual_run_id'
+ payload_key = 'message'
+ payload = {payload_key: 'Hello World'}
+
+ def trigga(context, obj):
+ obj.run_id = new_dag_run_id
+ obj.execution_date = new_execution_date
+ obj.payload = payload
+ if True:
+ return obj
+
+ t = TriggerDagRunOperator(
+ task_id='test_trigger_dagrun',
+ trigger_dag_id='example_bash_operator',
+ python_callable=trigga,
+ dag=self.dag)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+
+ session = settings.Session()
+ new_dag_run = session.query(models.DagRun).filter(
+ models.DagRun.run_id == new_dag_run_id).first()
+ self.assertEqual(new_dag_run.execution_date, new_execution_date)
+ self.assertEqual(new_dag_run.conf[payload_key], payload[payload_key])
+
def test_dryrun(self):
t = BashOperator(
task_id='time_sensor_check',
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services