Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7945854cc -> 3ed25a945


[AIRFLOW-2517] backfill support passing key values through CLI

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2517
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
In backfill, we can provide key-value pairs
through CLI and those pairs can be accessed
through macros. This is just like the way
`trigger_dag -c` works [1].

Let's walk through an example.

In the airflow CLI we specify a key-value pair.
```
airflow backfill hello_world -s 2018-02-01 -e
2018-02-08 -c '{"text": "some text"}'
```

In the DAG file, I have a `BashOperator` that
contains a template command and I want
{{ dag_run.conf.text }} resolves to the text I
passed in CLI.
```python
templated_command = """
    echo "ds = {{ ds }}"
    echo "prev_ds = {{
macros.datetime.strftime(prev_execution_date,
"%Y-%m-%d") }}"
    echo "next_ds = {{
macros.datetime.strftime(next_execution_date,
"%Y-%m-%d") }}"
    echo "text_through_conf = {{ dag_run.conf.text }}"
"""

bash_operator = BashOperator(
    task_id='bash_task',
    bash_command=templated_command,
    dag=dag
    )
```
Rendered Bash command in Airflow UI.
<img width="1246" alt="screen shot 2018-05-22 at 4
33 59 pm" src="https://user-images.githubuserconte
nt.com/6065051/40395666-04c41574-5dde-11e8-9ec2-c0
312b7203e6.png">

[1]
https://airflow.apache.org/cli.html#trigger_dag

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.
    - When adding new operators/hooks/sensors, the
autoclass documentation generation needs to be
added.

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes #3406 from milton0825/backfill-support-conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ed25a94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ed25a94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ed25a94

Branch: refs/heads/master
Commit: 3ed25a9459a2a0b26b849b1a3ea7906f26a33707
Parents: 7945854
Author: Chao-Han Tsai <[email protected]>
Authored: Wed May 30 10:50:06 2018 -0700
Committer: Maxime Beauchemin <[email protected]>
Committed: Wed May 30 10:50:06 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py |  7 ++++++-
 airflow/jobs.py    |  5 ++++-
 airflow/models.py  |  4 ++++
 tests/jobs.py      | 28 ++++++++++++++++++++++++++++
 4 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 55201dd..10c104a 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -180,6 +180,10 @@ def backfill(args, dag=None):
             task_regex=args.task_regex,
             include_upstream=not args.ignore_dependencies)
 
+    run_conf = None
+    if args.conf:
+        run_conf = json.loads(args.conf)
+
     if args.dry_run:
         print("Dry run of DAG {0} on {1}".format(args.dag_id,
                                                  args.start_date))
@@ -200,6 +204,7 @@ def backfill(args, dag=None):
             pool=args.pool,
             delay_on_limit_secs=args.delay_on_limit,
             verbose=args.verbose,
+            conf=run_conf,
         )
 
 
@@ -1678,7 +1683,7 @@ class CLIFactory(object):
                 'dag_id', 'task_regex', 'start_date', 'end_date',
                 'mark_success', 'local', 'donot_pickle',
                 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
-                'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose',
+                'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 
'conf'
             )
         }, {
             'func': list_tasks,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index a019879..827349a 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1956,6 +1956,7 @@ class BackfillJob(BaseJob):
             pool=None,
             delay_on_limit_secs=1.0,
             verbose=False,
+            conf=None,
             *args, **kwargs):
         self.dag = dag
         self.dag_id = dag.dag_id
@@ -1968,6 +1969,7 @@ class BackfillJob(BaseJob):
         self.pool = pool
         self.delay_on_limit_secs = delay_on_limit_secs
         self.verbose = verbose
+        self.conf = conf
         super(BackfillJob, self).__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status):
@@ -2090,7 +2092,8 @@ class BackfillJob(BaseJob):
             start_date=timezone.utcnow(),
             state=State.RUNNING,
             external_trigger=False,
-            session=session
+            session=session,
+            conf=self.conf,
         )
 
         # set required transient field

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 6038d79..2511989 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3973,6 +3973,7 @@ class DAG(BaseDag, LoggingMixin):
             pool=None,
             delay_on_limit_secs=1.0,
             verbose=False,
+            conf=None,
     ):
         """
         Runs the DAG.
@@ -4001,6 +4002,8 @@ class DAG(BaseDag, LoggingMixin):
         :type delay_on_limit_secs: float
         :param verbose: Make logging output more verbose
         :type verbose: boolean
+        :param conf: user defined dictionary passed from CLI
+        :type conf: dict
         """
         from airflow.jobs import BackfillJob
         if not executor and local:
@@ -4019,6 +4022,7 @@ class DAG(BaseDag, LoggingMixin):
             pool=pool,
             delay_on_limit_secs=delay_on_limit_secs,
             verbose=verbose,
+            conf=conf,
         )
         job.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ed25a94/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 504d149..8e7f056 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,6 +23,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import datetime
+import json
 import logging
 import multiprocessing
 import os
@@ -186,6 +187,33 @@ class BackfillJobTest(unittest.TestCase):
                 ignore_first_depends_on_past=True)
             job.run()
 
+    def test_backfill_conf(self):
+        dag = DAG(
+            dag_id='test_backfill_conf',
+            start_date=DEFAULT_DATE,
+            schedule_interval='@daily')
+
+        with dag:
+            DummyOperator(
+                task_id='op',
+                dag=dag)
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+
+        conf = json.loads("""{"key": "value"}""")
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          conf=conf)
+        job.run()
+
+        dr = DagRun.find(dag_id='test_backfill_conf')
+
+        self.assertEqual(conf, dr[0].conf)
+
     def test_backfill_ordered_concurrent_execute(self):
         dag = DAG(
             dag_id='test_backfill_ordered_concurrent_execute',

Reply via email to