pacmora opened a new issue, #47614: URL: https://github.com/apache/airflow/issues/47614
### Apache Airflow Provider(s) databricks ### Versions of Apache Airflow Providers apache-airflow-providers-databricks==7.2.0 ### Apache Airflow version 2.10.5 ### Operating System Debian Bookworm ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened When DatabricksWorkflowTaskGroup contains at least one task that has a child task, launch task generates wrong json payload. Tasks with parents are setting it owns task_key in depends_on field. ### What you think should happen instead Tasks with parent/s should set parent task_key instead of itself task_key ### How to reproduce Create DatabricksWorkflowTaskGroup. Then add a task_A >> task_B, Task_B will add it's owns task_key in "depends_on" field instead of task_A task_key ### Anything else I think the issue is located in file [airflow/providers/databricks/operators/databricks.py.py](https://github.com/apache/airflow/blob/providers-databricks/7.2.0/providers/databricks/src/airflow/providers/databricks/operators/databricks.py) The first time _generated_databricks_task_key is executed, even if task_id is provided or not, self._databricks_task_key will be set. It means no matter how many times "_generate_databricks_task_key", even passing differents task_id param, is called, it always will return the same value, the one returned in the first call. ``` def _generate_databricks_task_key(self, task_id: str | None = None) -> str: """Create a databricks task key using the hash of dag_id and task_id.""" if not self._databricks_task_key or len(self._databricks_task_key) > 100: self.log.info( "databricks_task_key has not be provided or the provided one exceeds 100 characters and will be truncated by the Databricks API. This will cause failure when trying to monitor the task. A task_key will be generated using the hash value of dag_id+task_id" ) task_id = task_id or self.task_id task_key = f"{self.dag_id}__{task_id}".encode() self._databricks_task_key = hashlib.md5(task_key).hexdigest() self.log.info("Generated databricks task_key: %s", self._databricks_task_key) return self._databricks_task_key ``` If we check block of code that converts a task into databricks_task, is setting "task_key" with self.databricks_task_key. At this point if _generate_databricks_task_key is called again, with or without task_id param, it will return always the same value due self._databricks_task_key is not "None" anymore. ``` def _convert_to_databricks_workflow_task( self, relevant_upstreams: list[BaseOperator], context: Context | None = None ) -> dict[str, object]: """Convert the operator to a Databricks workflow task that can be a task in a workflow.""" base_task_json = self._get_task_base_json() result = { "task_key": self.databricks_task_key, "depends_on": [ {"task_key": self._generate_databricks_task_key(task_id)} for task_id in self.upstream_task_ids if task_id in relevant_upstreams ], **base_task_json, } if self.existing_cluster_id and self.job_cluster_key: raise ValueError( "Both existing_cluster_id and job_cluster_key are set. Only one can be set per task." ) if self.existing_cluster_id: result["existing_cluster_id"] = self.existing_cluster_id elif self.job_cluster_key: result["job_cluster_key"] = self.job_cluster_key return result ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
