nivangio opened a new issue, #29746:
URL: https://github.com/apache/airflow/issues/29746

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-databricks==4.0.0
   
   ### Apache Airflow version
   
   2.4.3
   
   ### Operating System
   
   MAC OS
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   The issue is consistent across multiple Airflow deployments (locally on 
Docker Compose, remotely on MWAA in AWS, locally using virualenv)
   
   ### What happened
   
   Passing `base_parameters` key into `notebook_task` parameter for 
`DatabricksSubmitRunOperator` as output of a previous task (TaskFlow paradigm) 
does not work.
   
   After inspection of `DatabricksSubmitRunOperator.init` it seems that the 
problem relies on the fact that it uses 
`utils.databricks.normalise_json_content` to validate input parameters and, 
given that the input parameter is of type `PlainXComArg`, it fails to parse.
   
   The workaround I found is to call it using `partial` and `expand`, which is 
a bit hacky and much less legible
   
   ### What you think should happen instead
   
   `DatabricksSubmitRunOperator` should accept `PlainXComArg` arguments on init 
and eventually validate on `execute`, prior to submitting job run.
   
   ### How to reproduce
   
   This DAG fails to parse:
   
   ```python3
   with DAG(
       "dag_erroring",
       start_date=days_ago(1),
       params={"param_1": "", "param_2": ""},
   ) as dag:
   
       @task
       def from_dag_params_to_notebook_params(**context):
   
           # Transform/Validate DAG input parameters to sth expected by Notebook
           notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
           notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
   
           return {"some_param": notebook_param_1, "some_other_param": 
notebook_param_2}
   
       DatabricksSubmitRunOperator(
           task_id="my_notebook_task",
           new_cluster={
               "cluster_name": "single-node-cluster",
               "spark_version": "7.6.x-scala2.12",
               "node_type_id": "i3.xlarge",
               "num_workers": 0,
               "spark_conf": {
                   "spark.databricks.cluster.profile": "singleNode",
                   "spark.master": "[*, 4]",
               },
               "custom_tags": {"ResourceClass": "SingleNode"},
           },
           notebook_task={
               "notebook_path": "some/path/to/a/notebook",
               "base_parameters": from_dag_params_to_notebook_params(),
           },
           libraries=[],
           databricks_retry_limit=3,
           timeout_seconds=86400,
           polling_period_seconds=20,
       )
   ```
   
   This one does not:
   
   ```python3
   with DAG(
       "dag_parsing_fine",
       start_date=days_ago(1),
       params={"param_1": "", "param_2": ""},
   ) as dag:
   
       @task
       def from_dag_params_to_notebook_params(**context):
   
           # Transform/Validate DAG input parameters to sth expected by Notebook
           notebook_param_1 = context["dag_run"].conf["param_1"] + "abcd"
           notebook_param_2 = context["dag_run"].conf["param_2"] + "efgh"
   
           return [{"notebook_path": "some/path/to/a/notebook", 
"base_parameters":{"some_param": notebook_param_1, "some_other_param": 
notebook_param_2}}]
   
       DatabricksSubmitRunOperator.partial(
           task_id="my_notebook_task",
           new_cluster={
               "cluster_name": "single-node-cluster",
               "spark_version": "7.6.x-scala2.12",
               "node_type_id": "i3.xlarge",
               "num_workers": 0,
               "spark_conf": {
                   "spark.databricks.cluster.profile": "singleNode",
                   "spark.master": "[*, 4]",
               },
               "custom_tags": {"ResourceClass": "SingleNode"},
           },
           libraries=[],
           databricks_retry_limit=3,
           timeout_seconds=86400,
           polling_period_seconds=20,
       ).expand(notebook_task=from_dag_params_to_notebook_params())
   
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to