vatsrahul1001 opened a new issue, #40788:
URL: https://github.com/apache/airflow/issues/40788
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
DatabricksRunNowOperator started failing after upgrading to `6.7.0` version
with the below error
```
[2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
line 563, in _do_api_call
for attempt in self._get_retry_object():
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
435, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
368, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
390, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
line 573, in _do_api_call
response.raise_for_status()
File "/usr/local/lib/python3.11/site-packages/requests/models.py", line
1021, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url:
https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line
460, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line
432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line
401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py",
line 862, in execute
self.run_id = hook.run_now(self.json)
^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py",
line 243, in run_now
response = self._do_api_call(RUN_NOW_ENDPOINT, json)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
line 580, in _do_api_call
raise AirflowException(msg)
airflow.exceptions.AirflowException: Response:
{"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."},
Status Code: 400
```
I have verified it works well with 6.6.0 version
### What you think should happen instead?
_No response_
### How to reproduce
1. Try to run below DAG with databricks provider 6.7.0
import json
import os
from datetime import timedelta
from typing import Dict, Optional
from airflow.models.dag import DAG
from airflow.utils.timezone import datetime
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)
```
DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID",
"databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK",
notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay":
timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
new_cluster = {
"num_workers": 1,
"spark_version": "10.4.x-scala2.12",
"spark_conf": {},
"azure_attributes": {
"availability": "ON_DEMAND_AZURE",
"spot_bid_max_price": -1,
},
"node_type_id": "Standard_D3_v2",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
"cluster_source": "JOB",
"init_scripts": [],
}
with DAG(
dag_id="example_async_databricks",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
default_args=default_args,
tags=["example", "async", "databricks"],
) as dag:
# [START howto_operator_databricks_submit_run_async]
opr_submit_run = DatabricksSubmitRunOperator(
task_id="submit_run",
databricks_conn_id=DATABRICKS_CONN_ID,
new_cluster=new_cluster,
notebook_task=NOTEBOOK_TASK,
do_xcom_push=True,
deferrable=True
)
# [END howto_operator_databricks_submit_run_async]
# [START howto_operator_databricks_run_now_async]
opr_run_now = DatabricksRunNowOperator(
task_id="run_now",
databricks_conn_id=DATABRICKS_CONN_ID,
job_id="{{ task_instance.xcom_pull(task_ids='submit_run',
dag_id='example_async_databricks', key='job_id') }}",
notebook_params=notebook_params,
deferrable=True
)
# [END howto_operator_databricks_run_now_async]
opr_submit_run >> opr_run_now
```
### Operating System
Linux
### Versions of Apache Airflow Providers
databricks 6.7.0
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]