maciej-szuszkiewicz commented on issue #41816:
URL: https://github.com/apache/airflow/issues/41816#issuecomment-2405033993
Hey, I've ran into the same issue today. In our case, we're using in-house
DAG factory for generating DAGs from configuration files. This can result in
both long dag ids and task ids, as the task ids also contains task groups
names.
For example, I have a dag id that's already 81 chars long, and in addition
to that, the DatabricksWorkflowTaskGroup is nested in another group.
So, for me the task key generated by
`DatabricksTaskBaseOperator._get_databricks_task_id` is 125 chars long, and I
have no way of shortening it.
When I try to run this dag, `DatabricksWorkflowTaskGroup.launch` operator
fails with:
```
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url:
https://<redacted>.cloud.databricks.com/api/2.1/jobs/create
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line
401, in wrapper
return func(self, *args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py",
line 201, in execute
job_id = self._create_or_reset_job(context)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/operators/databricks_workflow.py",
line 178, in _create_or_reset_job
job_id = self._hook.create_job(job_spec)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks.py",
line 226, in create_job
response = self._do_api_call(CREATE_ENDPOINT, json)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/databricks/hooks/databricks_base.py",
line 579, in _do_api_call
raise AirflowException(msg)
airflow.exceptions.AirflowException: Response:
{"error_code":"INVALID_PARAMETER_VALUE","message":"The provided task key (of
125 characters) exceeds the maximum allowed length of 100 characters."}, Status
Code: 400
```
I see three options here:
- let users configure the `task_key` on their own in
`DatabricksTaskBaseOperator`.
I was able to set `task_key` in `DatabricksTaskOperator.task_config`
attibute. `DatabricksWorkflowTaskGroup.launch` executed successfully and
created a job in Databricks. However, the execution of that
`DatabricksTaskOperator` failed as it was looking for a Databricks task with
key generated by `_get_databricks_task_id`, which didn't exist in that job.
- remove `dag_id` from `DatabricksTaskBaseOperator._get_databricks_task_id`
- dag_id adds nothing to the uniqueness of the results returned from
`_get_databricks_task_id`, as it's the same value for all tasks. Only the task
id matters. But this is an incomplete fix, as it won't fix the issue in all
cases, for example deeply nested groups which will cause Airflow task id to be
longer than 100 chars
- trim `_get_databricks_task_id` return value to the last 100 characters
like `return f"{self.dag_id}__{task_id.replace('.', '__')}"[-100:]`. It may not
return super pretty values for longer ids, but should do the trick.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]