pacmora opened a new issue, #47614:
URL: https://github.com/apache/airflow/issues/47614

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-databricks==7.2.0
   
   
   ### Apache Airflow version
   
   2.10.5
   
   ### Operating System
   
   Debian Bookworm
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When DatabricksWorkflowTaskGroup contains at least one task that has a child 
task, launch task generates wrong json payload. Tasks with parents are setting 
it owns task_key in depends_on field.
   
   
   
   ### What you think should happen instead
   
   Tasks with parent/s should set parent task_key instead of itself task_key
   
   ### How to reproduce
   
   Create DatabricksWorkflowTaskGroup. Then add a task_A >> task_B, 
   
   Task_B will add it's owns task_key in "depends_on" field instead of task_A 
task_key
   
   ### Anything else
   
   I think the issue is located in file 
[airflow/providers/databricks/operators/databricks.py.py](https://github.com/apache/airflow/blob/providers-databricks/7.2.0/providers/databricks/src/airflow/providers/databricks/operators/databricks.py)
   
   The first time _generated_databricks_task_key is executed, even if task_id 
is provided or not, self._databricks_task_key will be set. It means no matter 
how many times "_generate_databricks_task_key", even passing differents task_id 
param, is called, it always will return the same value, the one returned in the 
first call.
   
   ```
   
   def _generate_databricks_task_key(self, task_id: str | None = None) -> str:
       """Create a databricks task key using the hash of dag_id and task_id."""
       if not self._databricks_task_key or len(self._databricks_task_key) > 100:
           self.log.info(
               "databricks_task_key has not be provided or the provided one 
exceeds 100 characters and will be truncated by the Databricks API. This will 
cause failure when trying to monitor the task. A task_key will be generated 
using the hash value of dag_id+task_id"
           )
           task_id = task_id or self.task_id
           task_key = f"{self.dag_id}__{task_id}".encode()
           self._databricks_task_key = hashlib.md5(task_key).hexdigest()
           self.log.info("Generated databricks task_key: %s", 
self._databricks_task_key)
       return self._databricks_task_key
   ```
   
   If we check block of code that converts a task into databricks_task, is 
setting "task_key" with self.databricks_task_key. At this point if 
_generate_databricks_task_key is called again, with or without task_id param, 
it will return always the same value due self._databricks_task_key is not 
"None" anymore.
   ```
   def _convert_to_databricks_workflow_task(
       self, relevant_upstreams: list[BaseOperator], context: Context | None = 
None
   ) -> dict[str, object]:
       """Convert the operator to a Databricks workflow task that can be a task 
in a workflow."""
       base_task_json = self._get_task_base_json()
       result = {
           "task_key": self.databricks_task_key,
           "depends_on": [
               {"task_key": self._generate_databricks_task_key(task_id)}
               for task_id in self.upstream_task_ids
               if task_id in relevant_upstreams
           ],
           **base_task_json,
       }
   
       if self.existing_cluster_id and self.job_cluster_key:
           raise ValueError(
               "Both existing_cluster_id and job_cluster_key are set. Only one 
can be set per task."
           )
       if self.existing_cluster_id:
           result["existing_cluster_id"] = self.existing_cluster_id
       elif self.job_cluster_key:
           result["job_cluster_key"] = self.job_cluster_key
   
       return result
   ```
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to