vatsrahul1001 opened a new issue, #40788:
URL: https://github.com/apache/airflow/issues/40788

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   DatabricksRunNowOperator started failing after upgrading to `6.7.0` version 
with the below error
   ```
   
   [2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 563, in _do_api_call
       for attempt in self._get_retry_object():
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
435, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
368, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
390, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 573, in _do_api_call
       response.raise_for_status()
     File "/usr/local/lib/python3.11/site-packages/requests/models.py", line 
1021, in raise_for_status
       raise HTTPError(http_error_msg, response=self)
   requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: 
https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
460, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 
401, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py",
 line 862, in execute
       self.run_id = hook.run_now(self.json)
                     ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py",
 line 243, in run_now
       response = self._do_api_call(RUN_NOW_ENDPOINT, json)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
 line 580, in _do_api_call
       raise AirflowException(msg)
   airflow.exceptions.AirflowException: Response: 
{"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."}, 
Status Code: 400
   ```
   
   I have verified it works well with 6.6.0 version
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Try to run below DAG with databricks provider 6.7.0
   
   import json
   import os
   from datetime import timedelta
   from typing import Dict, Optional
   
   from airflow.models.dag import DAG
   from airflow.utils.timezone import datetime
   
   from airflow.providers.databricks.operators.databricks import (
       DatabricksRunNowOperator,
       DatabricksSubmitRunOperator,
   )
   
   ```
   DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", 
"databricks_default")
   # Notebook path as a Json object
   notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
   NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", 
notebook_task))
   notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
   EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
   
   default_args = {
       "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
       "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
       "retry_delay": 
timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
   }
   
   new_cluster = {
       "num_workers": 1,
       "spark_version": "10.4.x-scala2.12",
       "spark_conf": {},
       "azure_attributes": {
           "availability": "ON_DEMAND_AZURE",
           "spot_bid_max_price": -1,
       },
       "node_type_id": "Standard_D3_v2",
       "ssh_public_keys": [],
       "custom_tags": {},
       "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
       "cluster_source": "JOB",
       "init_scripts": [],
   }
   
   
   with DAG(
       dag_id="example_async_databricks",
       start_date=datetime(2022, 1, 1),
       schedule=None,
       catchup=False,
       default_args=default_args,
       tags=["example", "async", "databricks"],
   ) as dag:
       # [START howto_operator_databricks_submit_run_async]
       opr_submit_run = DatabricksSubmitRunOperator(
           task_id="submit_run",
           databricks_conn_id=DATABRICKS_CONN_ID,
           new_cluster=new_cluster,
           notebook_task=NOTEBOOK_TASK,
           do_xcom_push=True,
           deferrable=True
       )
       # [END howto_operator_databricks_submit_run_async]
   
       # [START howto_operator_databricks_run_now_async]
       opr_run_now = DatabricksRunNowOperator(
           task_id="run_now",
           databricks_conn_id=DATABRICKS_CONN_ID,
           job_id="{{ task_instance.xcom_pull(task_ids='submit_run', 
dag_id='example_async_databricks', key='job_id') }}",
           notebook_params=notebook_params,
           deferrable=True
       )
       # [END howto_operator_databricks_run_now_async]
   
   opr_submit_run >> opr_run_now
   ```
   
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   databricks 6.7.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to