Repository: incubator-airflow Updated Branches: refs/heads/master 7945854cc -> 3ed25a945
[AIRFLOW-2517] backfill support passing key values through CLI ### JIRA - [x] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-2517 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a JIRA issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: In backfill, we can provide key-value pairs through CLI and those pairs can be accessed through macros. This is just like the way `trigger_dag -c` works [1]. Let's walk through an example. In the airflow CLI we specify a key-value pair. ``` airflow backfill hello_world -s 2018-02-01 -e 2018-02-08 -c '{"text": "some text"}' ``` In the DAG file, I have a `BashOperator` that contains a template command and I want {{ dag_run.conf.text }} resolves to the text I passed in CLI. ```python templated_command = """ echo "ds = {{ ds }}" echo "prev_ds = {{ macros.datetime.strftime(prev_execution_date, "%Y-%m-%d") }}" echo "next_ds = {{ macros.datetime.strftime(next_execution_date, "%Y-%m-%d") }}" echo "text_through_conf = {{ dag_run.conf.text }}" """ bash_operator = BashOperator( task_id='bash_task', bash_command=templated_command, dag=dag ) ``` Rendered Bash command in Airflow UI. <img width="1246" alt="screen shot 2018-05-22 at 4 33 59 pm" src="https://user-images.githubuserconte nt.com/6065051/40395666-04c41574-5dde-11e8-9ec2-c0 312b7203e6.png"> [1] https://airflow.apache.org/cli.html#trigger_dag ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` Closes #3406 from milton0825/backfill-support-conf Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ed25a94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ed25a94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ed25a94 Branch: refs/heads/master Commit: 3ed25a9459a2a0b26b849b1a3ea7906f26a33707 Parents: 7945854 Author: Chao-Han Tsai <[email protected]> Authored: Wed May 30 10:50:06 2018 -0700 Committer: Maxime Beauchemin <[email protected]> Committed: Wed May 30 10:50:06 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 7 ++++++- airflow/jobs.py | 5 ++++- airflow/models.py | 4 ++++ tests/jobs.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 55201dd..10c104a 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -180,6 +180,10 @@ def backfill(args, dag=None): task_regex=args.task_regex, include_upstream=not args.ignore_dependencies) + run_conf = None + if args.conf: + run_conf = json.loads(args.conf) + if args.dry_run: print("Dry run of DAG {0} on {1}".format(args.dag_id, args.start_date)) @@ -200,6 +204,7 @@ def backfill(args, dag=None): pool=args.pool, delay_on_limit_secs=args.delay_on_limit, verbose=args.verbose, + conf=run_conf, ) @@ -1678,7 +1683,7 @@ class CLIFactory(object): 'dag_id', 'task_regex', 'start_date', 'end_date', 'mark_success', 'local', 'donot_pickle', 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', - 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', + 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf' ) }, { 'func': list_tasks, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index a019879..827349a 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1956,6 +1956,7 @@ class BackfillJob(BaseJob): pool=None, delay_on_limit_secs=1.0, verbose=False, + conf=None, *args, **kwargs): self.dag = dag self.dag_id = dag.dag_id @@ -1968,6 +1969,7 @@ class BackfillJob(BaseJob): self.pool = pool self.delay_on_limit_secs = delay_on_limit_secs self.verbose = verbose + self.conf = conf super(BackfillJob, self).__init__(*args, **kwargs) def _update_counters(self, ti_status): @@ -2090,7 +2092,8 @@ class BackfillJob(BaseJob): start_date=timezone.utcnow(), state=State.RUNNING, external_trigger=False, - session=session + session=session, + conf=self.conf, ) # set required transient field http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 6038d79..2511989 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3973,6 +3973,7 @@ class DAG(BaseDag, LoggingMixin): pool=None, delay_on_limit_secs=1.0, verbose=False, + conf=None, ): """ Runs the DAG. @@ -4001,6 +4002,8 @@ class DAG(BaseDag, LoggingMixin): :type delay_on_limit_secs: float :param verbose: Make logging output more verbose :type verbose: boolean + :param conf: user defined dictionary passed from CLI + :type conf: dict """ from airflow.jobs import BackfillJob if not executor and local: @@ -4019,6 +4022,7 @@ class DAG(BaseDag, LoggingMixin): pool=pool, delay_on_limit_secs=delay_on_limit_secs, verbose=verbose, + conf=conf, ) job.run() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 504d149..8e7f056 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -23,6 +23,7 @@ from __future__ import print_function from __future__ import unicode_literals import datetime +import json import logging import multiprocessing import os @@ -186,6 +187,33 @@ class BackfillJobTest(unittest.TestCase): ignore_first_depends_on_past=True) job.run() + def test_backfill_conf(self): + dag = DAG( + dag_id='test_backfill_conf', + start_date=DEFAULT_DATE, + schedule_interval='@daily') + + with dag: + DummyOperator( + task_id='op', + dag=dag) + + dag.clear() + + executor = TestExecutor(do_update=True) + + conf = json.loads("""{"key": "value"}""") + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + conf=conf) + job.run() + + dr = DagRun.find(dag_id='test_backfill_conf') + + self.assertEqual(conf, dr[0].conf) + def test_backfill_ordered_concurrent_execute(self): dag = DAG( dag_id='test_backfill_ordered_concurrent_execute',
