nivangio opened a new issue, #29746:
URL: https://github.com/apache/airflow/issues/29746
### Apache Airflow Provider(s)
databricks
### Versions of Apache Airflow Providers
apache-airflow-providers-databricks==4.0.0
### Apache Airflow version
2.4.3
### Operating System
MAC OS
### Deployment
Virtualenv installation
### Deployment details
The issue is consistent across multiple Airflow deployments (locally on
Docker Compose, remotely on MWAA in AWS, locally using virualenv)
### What happened
Passing `base_parameters` key into `notebook_task` parameter for
`DatabricksSubmitRunOperator` as output of a previous task (TaskFlow paradigm)
does not work.
After inspection of `DatabricksSubmitRunOperator.init` it seems that the
problem relies on the fact that it uses
`utils.databricks.normalise_json_content` to validate input parameters and,
given that the input parameter is of type `PlainXComArg`, it fails to parse.
The workaround I found is to call it using `partial` and `expand`, which is
a bit hacky and much less legible
### What you think should happen instead
`DatabricksSubmitRunOperator` should accept `PlainXComArg` arguments on init
and eventually validate on `execute`, prior to submitting job run.
### How to reproduce
This DAG fails to parse:
```python3
with DAG(
"dag_erroring",
start_date=days_ago(1),
params={"param_1": "", "param_2": ""},
) as dag:
@task
def from_dag_params_to_notebook_params(**context):
# Transform/Validate DAG input parameters to sth expected by Notebook
notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
return {"some_param": notebook_param_1, "some_other_param":
notebook_param_2}
DatabricksSubmitRunOperator(
task_id="my_notebook_task",
new_cluster={
"cluster_name": "single-node-cluster",
"spark_version": "7.6.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "[*, 4]",
},
"custom_tags": {"ResourceClass": "SingleNode"},
},
notebook_task={
"notebook_path": "some/path/to/a/notebook",
"base_parameters": from_dag_params_to_notebook_params(),
},
libraries=[],
databricks_retry_limit=3,
timeout_seconds=86400,
polling_period_seconds=20,
)
```
This one does not:
```python3
with DAG(
"dag_parsing_fine",
start_date=days_ago(1),
params={"param_1": "", "param_2": ""},
) as dag:
@task
def from_dag_params_to_notebook_params(**context):
# Transform/Validate DAG input parameters to sth expected by Notebook
notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
return [{"notebook_path": "some/path/to/a/notebook",
"base_parameters":{"some_param": notebook_param_1, "some_other_param":
notebook_param_2}}]
DatabricksSubmitRunOperator.partial(
task_id="my_notebook_task",
new_cluster={
"cluster_name": "single-node-cluster",
"spark_version": "7.6.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "[*, 4]",
},
"custom_tags": {"ResourceClass": "SingleNode"},
},
libraries=[],
databricks_retry_limit=3,
timeout_seconds=86400,
polling_period_seconds=20,
).expand(notebook_task=from_dag_params_to_notebook_params())
```
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]