pankajastro commented on code in PR #40013:
URL: https://github.com/apache/airflow/pull/40013#discussion_r1624117127


##########
airflow/providers/databricks/operators/databricks.py:
##########
@@ -1178,10 +1118,250 @@ def execute(self, context: Context) -> None:
             self.databricks_run_id = workflow_run_metadata.run_id
             self.databricks_conn_id = workflow_run_metadata.conn_id
         else:
-            self.launch_notebook_job()
+            self.launch_job()
         if self.wait_for_termination:
             self.monitor_databricks_job()
 
     def execute_complete(self, context: dict | None, event: dict) -> None:
         run_state = RunState.from_json(event["run_state"])
         self._handle_terminal_run_state(run_state)
+
+
+class DatabricksNotebookOperator(DatabricksTaskBaseOperator):
+    """
+    Runs a notebook on Databricks using an Airflow operator.
+
+    The DatabricksNotebookOperator allows users to launch and monitor notebook 
job runs on Databricks as
+    Airflow tasks. It can be used as a part of a DatabricksWorkflowTaskGroup 
to take advantage of job
+    clusters, which allows users to run their tasks on cheaper clusters that 
can be shared between tasks.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DatabricksNotebookOperator`
+
+    :param notebook_path: The path to the notebook in Databricks.
+    :param source: Optional location type of the notebook. When set to 
WORKSPACE, the notebook will be retrieved
+            from the local Databricks workspace. When set to GIT, the notebook 
will be retrieved from a Git repository
+            defined in git_source. If the value is empty, the task will use 
GIT if git_source is defined
+            and WORKSPACE otherwise. For more information please visit
+            
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate
+    :param databricks_conn_id: The name of the Airflow connection to use.
+    :param databricks_retry_args: An optional dictionary with arguments passed 
to ``tenacity.Retrying`` class.
+    :param databricks_retry_delay: Number of seconds to wait between retries.
+    :param databricks_retry_limit: Amount of times to retry if the Databricks 
backend is unreachable.
+    :param deferrable: Whether to run the operator in the deferrable mode.
+    :param existing_cluster_id: ID for existing cluster on which to run this 
task.
+    :param job_cluster_key: The key for the job cluster.
+    :param new_cluster: Specs for a new cluster on which this task will be run.
+    :param notebook_packages: A list of the Python libraries to be installed 
on the cluster running the
+        notebook.
+    :param notebook_params: A dict of key-value pairs to be passed as optional 
params to the notebook task.
+    :param polling_period_seconds: Controls the rate which we poll for the 
result of this notebook job run.
+    :param wait_for_termination: if we should wait for termination of the job 
run. ``True`` by default.
+    """
+
+    template_fields = (
+        "notebook_params",
+        "workflow_run_metadata",
+    )
+    CALLER = "DatabricksNotebookOperator"
+
+    def __init__(
+        self,
+        notebook_path: str,
+        source: str,
+        databricks_conn_id: str = "databricks_default",
+        databricks_retry_args: dict[Any, Any] | None = None,
+        databricks_retry_delay: int = 1,
+        databricks_retry_limit: int = 3,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        existing_cluster_id: str = "",
+        job_cluster_key: str = "",
+        new_cluster: dict[str, Any] | None = None,
+        notebook_packages: list[dict[str, Any]] | None = None,
+        notebook_params: dict | None = None,
+        polling_period_seconds: int = 5,
+        wait_for_termination: bool = True,
+        workflow_run_metadata: dict | None = None,
+        **kwargs: Any,
+    ):
+        self.notebook_path = notebook_path
+        self.source = source
+        self.databricks_conn_id = databricks_conn_id
+        self.databricks_retry_args = databricks_retry_args
+        self.databricks_retry_delay = databricks_retry_delay
+        self.databricks_retry_limit = databricks_retry_limit
+        self.databricks_run_id: int | None = None
+        self.deferrable = deferrable

Review Comment:
   wondering if we can avoid setting these param here since anyway you are 
setting in supper class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to