This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95169d1 Add a link to Databricks Job Run (#22541)
95169d1 is described below
commit 95169d1d07e66a8c7647e5b0f6a14cea57d515fc
Author: Alex Ott <[email protected]>
AuthorDate: Sun Mar 27 22:29:04 2022 +0200
Add a link to Databricks Job Run (#22541)
It will be easier for users/admins to go to the specific run of
Databricks Job
---
.../providers/databricks/operators/databricks.py | 24 +++++++++++++++++-----
airflow/providers/databricks/provider.yaml | 3 +++
.../providers/databricks/hooks/test_databricks.py | 2 +-
3 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/databricks/operators/databricks.py
b/airflow/providers/databricks/operators/databricks.py
index ec0a9a0..7636842 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -22,7 +22,7 @@ import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, BaseOperatorLink, TaskInstance
from airflow.providers.databricks.hooks.databricks import DatabricksHook
if TYPE_CHECKING:
@@ -70,11 +70,11 @@ def _handle_databricks_operator_execution(operator, hook,
log, context) -> None:
:param operator: Databricks operator being handled
:param context: Airflow context
"""
- if operator.do_xcom_push:
+ if operator.do_xcom_push and context is not None:
context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id)
log.info('Run submitted with run_id: %s', operator.run_id)
run_page_url = hook.get_run_page_url(operator.run_id)
- if operator.do_xcom_push:
+ if operator.do_xcom_push and context is not None:
context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
if operator.wait_for_termination:
@@ -102,6 +102,18 @@ def _handle_databricks_operator_execution(operator, hook,
log, context) -> None:
log.info('View run status, Spark UI, and logs at %s', run_page_url)
+class DatabricksJobRunLink(BaseOperatorLink):
+ """Constructs a link to monitor a Databricks Job Run."""
+
+ name = "See Databricks Job Run"
+
+ def get_link(self, operator, dttm):
+ ti = TaskInstance(task=operator, execution_date=dttm)
+ run_page_url = ti.xcom_pull(task_ids=operator.task_id,
key=XCOM_RUN_PAGE_URL_KEY)
+
+ return run_page_url
+
+
class DatabricksSubmitRunOperator(BaseOperator):
"""
Submits a Spark job run to Databricks using the
@@ -255,6 +267,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
# Databricks brand color (blue) under white text
ui_color = '#1CB1C2'
ui_fgcolor = '#fff'
+ operator_extra_links = (DatabricksJobRunLink(),)
def __init__(
self,
@@ -276,7 +289,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
databricks_retry_args: Optional[Dict[Any, Any]] = None,
- do_xcom_push: bool = False,
+ do_xcom_push: bool = True,
idempotency_token: Optional[str] = None,
access_control_list: Optional[List[Dict[str, str]]] = None,
wait_for_termination: bool = True,
@@ -498,6 +511,7 @@ class DatabricksRunNowOperator(BaseOperator):
# Databricks brand color (blue) under white text
ui_color = '#1CB1C2'
ui_fgcolor = '#fff'
+ operator_extra_links = (DatabricksJobRunLink(),)
def __init__(
self,
@@ -514,7 +528,7 @@ class DatabricksRunNowOperator(BaseOperator):
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
databricks_retry_args: Optional[Dict[Any, Any]] = None,
- do_xcom_push: bool = False,
+ do_xcom_push: bool = True,
wait_for_termination: bool = True,
**kwargs,
) -> None:
diff --git a/airflow/providers/databricks/provider.yaml
b/airflow/providers/databricks/provider.yaml
index ba9b3f0..77ced24 100644
--- a/airflow/providers/databricks/provider.yaml
+++ b/airflow/providers/databricks/provider.yaml
@@ -91,3 +91,6 @@ connection-types:
connection-type: databricks
- hook-class-name:
airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook
connection-type: databricks
+
+extra-links:
+ - airflow.providers.databricks.operators.databricks.DatabricksJobRunLink
diff --git a/tests/providers/databricks/hooks/test_databricks.py
b/tests/providers/databricks/hooks/test_databricks.py
index d1adba0..5a93ed7 100644
--- a/tests/providers/databricks/hooks/test_databricks.py
+++ b/tests/providers/databricks/hooks/test_databricks.py
@@ -779,7 +779,7 @@ class TestRunState(unittest.TestCase):
def test_is_terminal_with_nonexistent_life_cycle_state(self):
run_state = RunState('blah', '', '')
with pytest.raises(AirflowException):
- run_state.is_terminal
+ assert run_state.is_terminal
def test_is_successful(self):
run_state = RunState('TERMINATED', 'SUCCESS', '')