This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d02277ae1 Deprecate databricks async operator (#30761)
7d02277ae1 is described below
commit 7d02277ae13b7d1e6cea9e6c8ff0d411100daf77
Author: Pankaj Singh <[email protected]>
AuthorDate: Sun Apr 23 02:14:35 2023 +0530
Deprecate databricks async operator (#30761)
---
.../providers/databricks/operators/databricks.py | 40 ++++++++++++++++++++--
1 file changed, 38 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/databricks/operators/databricks.py
b/airflow/providers/databricks/operators/databricks.py
index fb9351818c..61384c8015 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -19,6 +19,7 @@
from __future__ import annotations
import time
+import warnings
from logging import Logger
from typing import TYPE_CHECKING, Any, Sequence
@@ -267,6 +268,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
:param do_xcom_push: Whether we should push run_id and run_page_url to
xcom.
:param git_source: Optional specification of a remote git repository from
which
supported task types are retrieved.
+ :param deferrable: Run operator in the deferrable mode.
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
@@ -306,6 +308,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
access_control_list: list[dict[str, str]] | None = None,
wait_for_termination: bool = True,
git_source: dict[str, str] | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
"""Creates a new ``DatabricksSubmitRunOperator``."""
@@ -317,6 +320,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
self.databricks_retry_delay = databricks_retry_delay
self.databricks_retry_args = databricks_retry_args
self.wait_for_termination = wait_for_termination
+ self.deferrable = deferrable
if tasks is not None:
self.json["tasks"] = tasks
if spark_jar_task is not None:
@@ -373,7 +377,10 @@ class DatabricksSubmitRunOperator(BaseOperator):
def execute(self, context: Context):
json_normalised = normalise_json_content(self.json)
self.run_id = self._hook.submit_run(json_normalised)
- _handle_databricks_operator_execution(self, self._hook, self.log,
context)
+ if self.deferrable:
+ _handle_deferrable_databricks_operator_execution(self, self._hook,
self.log, context)
+ else:
+ _handle_databricks_operator_execution(self, self._hook, self.log,
context)
def on_kill(self):
if self.run_id:
@@ -384,10 +391,23 @@ class DatabricksSubmitRunOperator(BaseOperator):
else:
self.log.error("Error: Task: %s with invalid run_id was requested
to be cancelled.", self.task_id)
+ def execute_complete(self, context: dict | None, event: dict):
+ _handle_deferrable_databricks_operator_completion(event, self.log)
+
class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
"""Deferrable version of ``DatabricksSubmitRunOperator``"""
+ def __init__(self, *args, **kwargs):
+ warnings.warn(
+ "`DatabricksSubmitRunDeferrableOperator` has been deprecated. "
+ "Please use
`airflow.providers.databricks.operators.DatabricksSubmitRunOperator` with "
+ "`deferrable=True` instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ super().__init__(deferrable=True, *args, **kwargs)
+
def execute(self, context):
hook = self._get_hook(caller="DatabricksSubmitRunDeferrableOperator")
json_normalised = normalise_json_content(self.json)
@@ -549,6 +569,7 @@ class DatabricksRunNowOperator(BaseOperator):
:param databricks_retry_args: An optional dictionary with arguments passed
to ``tenacity.Retrying`` class.
:param do_xcom_push: Whether we should push run_id and run_page_url to
xcom.
:param wait_for_termination: if we should wait for termination of the job
run. ``True`` by default.
+ :param deferrable: Run operator in the deferrable mode.
"""
# Used in airflow.models.BaseOperator
@@ -578,6 +599,7 @@ class DatabricksRunNowOperator(BaseOperator):
databricks_retry_args: dict[Any, Any] | None = None,
do_xcom_push: bool = True,
wait_for_termination: bool = True,
+ deferrable: bool = False,
**kwargs,
) -> None:
"""Creates a new ``DatabricksRunNowOperator``."""
@@ -589,6 +611,7 @@ class DatabricksRunNowOperator(BaseOperator):
self.databricks_retry_delay = databricks_retry_delay
self.databricks_retry_args = databricks_retry_args
self.wait_for_termination = wait_for_termination
+ self.deferrable = deferrable
if job_id is not None:
self.json["job_id"] = job_id
@@ -636,7 +659,10 @@ class DatabricksRunNowOperator(BaseOperator):
self.json["job_id"] = job_id
del self.json["job_name"]
self.run_id = hook.run_now(self.json)
- _handle_databricks_operator_execution(self, hook, self.log, context)
+ if self.deferrable:
+ _handle_deferrable_databricks_operator_execution(self, hook,
self.log, context)
+ else:
+ _handle_databricks_operator_execution(self, hook, self.log,
context)
def on_kill(self):
if self.run_id:
@@ -651,6 +677,16 @@ class DatabricksRunNowOperator(BaseOperator):
class DatabricksRunNowDeferrableOperator(DatabricksRunNowOperator):
"""Deferrable version of ``DatabricksRunNowOperator``"""
+ def __init__(self, *args, **kwargs):
+ warnings.warn(
+ "`DatabricksRunNowDeferrableOperator` has been deprecated. "
+ "Please use
`airflow.providers.databricks.operators.DatabricksRunNowOperator` with "
+ "`deferrable=True` instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ super().__init__(deferrable=True, *args, **kwargs)
+
def execute(self, context):
hook = self._get_hook(caller="DatabricksRunNowDeferrableOperator")
self.run_id = hook.run_now(self.json)