This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d02277ae1 Deprecate databricks async operator (#30761)
7d02277ae1 is described below

commit 7d02277ae13b7d1e6cea9e6c8ff0d411100daf77
Author: Pankaj Singh <[email protected]>
AuthorDate: Sun Apr 23 02:14:35 2023 +0530

    Deprecate databricks async operator (#30761)
---
 .../providers/databricks/operators/databricks.py   | 40 ++++++++++++++++++++--
 1 file changed, 38 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index fb9351818c..61384c8015 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -19,6 +19,7 @@
 from __future__ import annotations
 
 import time
+import warnings
 from logging import Logger
 from typing import TYPE_CHECKING, Any, Sequence
 
@@ -267,6 +268,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
     :param do_xcom_push: Whether we should push run_id and run_page_url to 
xcom.
     :param git_source: Optional specification of a remote git repository from 
which
         supported task types are retrieved.
+    :param deferrable: Run operator in the deferrable mode.
 
         .. seealso::
             
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
@@ -306,6 +308,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         access_control_list: list[dict[str, str]] | None = None,
         wait_for_termination: bool = True,
         git_source: dict[str, str] | None = None,
+        deferrable: bool = False,
         **kwargs,
     ) -> None:
         """Creates a new ``DatabricksSubmitRunOperator``."""
@@ -317,6 +320,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         self.databricks_retry_delay = databricks_retry_delay
         self.databricks_retry_args = databricks_retry_args
         self.wait_for_termination = wait_for_termination
+        self.deferrable = deferrable
         if tasks is not None:
             self.json["tasks"] = tasks
         if spark_jar_task is not None:
@@ -373,7 +377,10 @@ class DatabricksSubmitRunOperator(BaseOperator):
     def execute(self, context: Context):
         json_normalised = normalise_json_content(self.json)
         self.run_id = self._hook.submit_run(json_normalised)
-        _handle_databricks_operator_execution(self, self._hook, self.log, 
context)
+        if self.deferrable:
+            _handle_deferrable_databricks_operator_execution(self, self._hook, 
self.log, context)
+        else:
+            _handle_databricks_operator_execution(self, self._hook, self.log, 
context)
 
     def on_kill(self):
         if self.run_id:
@@ -384,10 +391,23 @@ class DatabricksSubmitRunOperator(BaseOperator):
         else:
             self.log.error("Error: Task: %s with invalid run_id was requested 
to be cancelled.", self.task_id)
 
+    def execute_complete(self, context: dict | None, event: dict):
+        _handle_deferrable_databricks_operator_completion(event, self.log)
+
 
 class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
     """Deferrable version of ``DatabricksSubmitRunOperator``"""
 
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            "`DatabricksSubmitRunDeferrableOperator` has been deprecated. "
+            "Please use 
`airflow.providers.databricks.operators.DatabricksSubmitRunOperator` with "
+            "`deferrable=True` instead.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(deferrable=True, *args, **kwargs)
+
     def execute(self, context):
         hook = self._get_hook(caller="DatabricksSubmitRunDeferrableOperator")
         json_normalised = normalise_json_content(self.json)
@@ -549,6 +569,7 @@ class DatabricksRunNowOperator(BaseOperator):
     :param databricks_retry_args: An optional dictionary with arguments passed 
to ``tenacity.Retrying`` class.
     :param do_xcom_push: Whether we should push run_id and run_page_url to 
xcom.
     :param wait_for_termination: if we should wait for termination of the job 
run. ``True`` by default.
+    :param deferrable: Run operator in the deferrable mode.
     """
 
     # Used in airflow.models.BaseOperator
@@ -578,6 +599,7 @@ class DatabricksRunNowOperator(BaseOperator):
         databricks_retry_args: dict[Any, Any] | None = None,
         do_xcom_push: bool = True,
         wait_for_termination: bool = True,
+        deferrable: bool = False,
         **kwargs,
     ) -> None:
         """Creates a new ``DatabricksRunNowOperator``."""
@@ -589,6 +611,7 @@ class DatabricksRunNowOperator(BaseOperator):
         self.databricks_retry_delay = databricks_retry_delay
         self.databricks_retry_args = databricks_retry_args
         self.wait_for_termination = wait_for_termination
+        self.deferrable = deferrable
 
         if job_id is not None:
             self.json["job_id"] = job_id
@@ -636,7 +659,10 @@ class DatabricksRunNowOperator(BaseOperator):
             self.json["job_id"] = job_id
             del self.json["job_name"]
         self.run_id = hook.run_now(self.json)
-        _handle_databricks_operator_execution(self, hook, self.log, context)
+        if self.deferrable:
+            _handle_deferrable_databricks_operator_execution(self, hook, 
self.log, context)
+        else:
+            _handle_databricks_operator_execution(self, hook, self.log, 
context)
 
     def on_kill(self):
         if self.run_id:
@@ -651,6 +677,16 @@ class DatabricksRunNowOperator(BaseOperator):
 class DatabricksRunNowDeferrableOperator(DatabricksRunNowOperator):
     """Deferrable version of ``DatabricksRunNowOperator``"""
 
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            "`DatabricksRunNowDeferrableOperator` has been deprecated. "
+            "Please use 
`airflow.providers.databricks.operators.DatabricksRunNowOperator` with "
+            "`deferrable=True` instead.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(deferrable=True, *args, **kwargs)
+
     def execute(self, context):
         hook = self._get_hook(caller="DatabricksRunNowDeferrableOperator")
         self.run_id = hook.run_now(self.json)

Reply via email to