sushil-louisa opened a new issue, #42438: URL: https://github.com/apache/airflow/issues/42438
### Description It would be great if we can allow `CreateDatabricksWorkflowOperator` to support Jinja templating for `job_clusters` attribute. This will allow us to dynamically render cluster configuration at runtime based on the context, such as spark env variables or spark configurations. https://github.com/apache/airflow/blob/e0bddbc438872277ca8de7fb794285cf546ddc0b/airflow/providers/databricks/operators/databricks_workflow.py#L96 ### Use case/motivation I have an Airflow Dag. In this dag, I am trying to use output of upstream task in `job_clusters` attribute while defining the `DatabricksWorkflowTaskGroup`. In current implementation, output of upstream task doesn't get jinjaified at runtime. **Sample:** ``` dag = DAG( dag_id="example_databricks_workflow", start_date=datetime(2023, 1, 1), schedule=None, catchup=False, tags=["example", "databricks"], ) with dag: fetch_config = PythonOperator( task_id='fetch_config', python_callable=fetch_config_func, provide_context=True, ) task_group = DatabricksWorkflowTaskGroup( group_id=f"test_workflow", job_clusters=[ { "job_cluster_key": "Shared_job_cluster", "new_cluster": { "cluster_name": "", "spark_version": "11.3.x-scala2.12", "aws_attributes": { ... }, "node_type_id": "i3.xlarge", # Pass output of fetch_config task as spark env variables. "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3", "CONFIG": f"{fetch_config.output}"}, "enable_elastic_disk": False, "data_security_mode": "LEGACY_SINGLE_USER_STANDARD", "runtime_engine": "STANDARD", "num_workers": 8, } ] ) with task_group: task_operator_nb_1 = DatabricksTaskOperator( task_id="nb_1", databricks_conn_id="databricks_conn", job_cluster_key="Shared_job_cluster", task_config={ "notebook_task": { "notebook_path": "/Shared/Notebook_1", "source": "WORKSPACE", }, "libraries": [ {"pypi": {"package": "Faker"}}, {"pypi": {"package": "simplejson"}}, ], }, ) sql_query = DatabricksTaskOperator( task_id="sql_query", databricks_conn_id="databricks_conn", task_config={ "sql_task": { "query": { "query_id": QUERY_ID, }, "warehouse_id": WAREHOUSE_ID, } }, ) task_operator_nb_1 >> sql_query fetch_config >> task_group ``` ### Related issues _No response_ ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
