Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1e82e11ae -> 2b030699d


[AIRFLOW-1652] Push DatabricksRunSubmitOperator metadata into XCOM

[AIRFLOW-1652] Push DatabricksRunSubmitOperator
metadata into XCOM

Push run_id and run_page_url into xcom so
callbacks and other
tasks can reference this information

address comments

Closes #2641 from andrewmchen/databricks-xcom


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b030699
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b030699
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b030699

Branch: refs/heads/master
Commit: 2b030699de351c557a646e8e620c1b96a0397070
Parents: 1e82e11
Author: Andrew Chen <[email protected]>
Authored: Mon Apr 23 19:14:27 2018 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Mon Apr 23 19:14:27 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/databricks_operator.py | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b030699/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py 
b/airflow/contrib/operators/databricks_operator.py
index d32dab9..2aa1ef2 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -26,6 +26,10 @@ from airflow.contrib.hooks.databricks_hook import 
DatabricksHook
 from airflow.models import BaseOperator
 
 
+XCOM_RUN_ID_KEY = 'run_id'
+XCOM_RUN_PAGE_URL_KEY = 'run_page_url'
+
+
 class DatabricksSubmitRunOperator(BaseOperator):
     """
     Submits an Spark job run to Databricks using the
@@ -143,6 +147,8 @@ class DatabricksSubmitRunOperator(BaseOperator):
     :param databricks_retry_limit: Amount of times retry if the Databricks 
backend is
         unreachable. Its value must be greater than or equal to 1.
     :type databricks_retry_limit: int
+    :param do_xcom_push: Whether we should push run_id and run_page_url to 
xcom.
+    :type do_xcom_push: boolean
     """
     # Used in airflow.models.BaseOperator
     template_fields = ('json',)
@@ -163,6 +169,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
             databricks_conn_id='databricks_default',
             polling_period_seconds=30,
             databricks_retry_limit=3,
+            do_xcom_push=False,
             **kwargs):
         """
         Creates a new ``DatabricksSubmitRunOperator``.
@@ -192,6 +199,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         self.json = self._deep_string_coerce(self.json)
         # This variable will be used in case our task gets killed.
         self.run_id = None
+        self.do_xcom_push = do_xcom_push
 
     def _deep_string_coerce(self, content, json_path='json'):
         """
@@ -229,8 +237,12 @@ class DatabricksSubmitRunOperator(BaseOperator):
     def execute(self, context):
         hook = self.get_hook()
         self.run_id = hook.submit_run(self.json)
-        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id)
         self.log.info('Run submitted with run_id: %s', self.run_id)
+        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, 
value=run_page_url)
         self._log_run_page_url(run_page_url)
         while True:
             run_state = hook.get_run_state(self.run_id)

Reply via email to