Repository: incubator-airflow Updated Branches: refs/heads/master 1e82e11ae -> 2b030699d
[AIRFLOW-1652] Push DatabricksRunSubmitOperator metadata into XCOM [AIRFLOW-1652] Push DatabricksRunSubmitOperator metadata into XCOM Push run_id and run_page_url into xcom so callbacks and other tasks can reference this information address comments Closes #2641 from andrewmchen/databricks-xcom Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b030699 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b030699 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b030699 Branch: refs/heads/master Commit: 2b030699de351c557a646e8e620c1b96a0397070 Parents: 1e82e11 Author: Andrew Chen <[email protected]> Authored: Mon Apr 23 19:14:27 2018 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Mon Apr 23 19:14:27 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/operators/databricks_operator.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b030699/airflow/contrib/operators/databricks_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index d32dab9..2aa1ef2 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -26,6 +26,10 @@ from airflow.contrib.hooks.databricks_hook import DatabricksHook from airflow.models import BaseOperator +XCOM_RUN_ID_KEY = 'run_id' +XCOM_RUN_PAGE_URL_KEY = 'run_page_url' + + class DatabricksSubmitRunOperator(BaseOperator): """ Submits an Spark job run to Databricks using the @@ -143,6 +147,8 @@ class DatabricksSubmitRunOperator(BaseOperator): :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :type databricks_retry_limit: int + :param do_xcom_push: Whether we should push run_id and run_page_url to xcom. + :type do_xcom_push: boolean """ # Used in airflow.models.BaseOperator template_fields = ('json',) @@ -163,6 +169,7 @@ class DatabricksSubmitRunOperator(BaseOperator): databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, + do_xcom_push=False, **kwargs): """ Creates a new ``DatabricksSubmitRunOperator``. @@ -192,6 +199,7 @@ class DatabricksSubmitRunOperator(BaseOperator): self.json = self._deep_string_coerce(self.json) # This variable will be used in case our task gets killed. self.run_id = None + self.do_xcom_push = do_xcom_push def _deep_string_coerce(self, content, json_path='json'): """ @@ -229,8 +237,12 @@ class DatabricksSubmitRunOperator(BaseOperator): def execute(self, context): hook = self.get_hook() self.run_id = hook.submit_run(self.json) - run_page_url = hook.get_run_page_url(self.run_id) + if self.do_xcom_push: + context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id) self.log.info('Run submitted with run_id: %s', self.run_id) + run_page_url = hook.get_run_page_url(self.run_id) + if self.do_xcom_push: + context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url) self._log_run_page_url(run_page_url) while True: run_state = hook.get_run_state(self.run_id)
