yuqian90 commented on a change in pull request #12312:
URL: https://github.com/apache/airflow/pull/12312#discussion_r522666351



##########
File path: airflow/operators/python.py
##########
@@ -165,7 +166,7 @@ def __init__(
         multiple_outputs: bool = False,
         **kwargs,
     ) -> None:
-        kwargs['task_id'] = self._get_unique_task_id(task_id, 
kwargs.get('dag'))
+        kwargs['task_id'] = self._get_unique_task_id(task_id, 
kwargs.get('dag'), kwargs.get('task_group'))

Review comment:
       Instead of doing `kwargs.get`, I suggest having `dag` and `task_group` 
as two keyword arguments with defaults. I.e. like this:
   
   ```python
       def __init__(
           self,                self,
           *,           *,
           python_callable: Callable,           python_callable: Callable,
           task_id: str,                task_id: str,
           op_args: Tuple[Any],         op_args: Tuple[Any],
           op_kwargs: Dict[str, Any],           op_kwargs: Dict[str, Any],
           multiple_outputs: bool = False,              multiple_outputs: bool 
= False,
           dag=None,
           task_group=None,
           **kwargs,            **kwargs,
       ) -> None:           ) -> None:
           kwargs['task_id'] = self._get_unique_task_id(task_id, dag, 
task_group)
           ...
           super().__init__(dag=dag, task_group=task_group, **kwargs)
   ```

##########
File path: airflow/operators/python.py
##########
@@ -190,7 +193,15 @@ def _get_unique_task_id(task_id: str, dag: Optional[DAG] = 
None) -> str:
           task_id__20
         """
         dag = dag or DagContext.get_current_dag()
-        if not dag or task_id not in dag.task_ids:
+        if not dag:
+            return task_id
+
+        # We need check if we are in context of TaskGroup as the task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else None
+
+        if (task_id not in dag.task_ids) and (tg_task_id not in dag.task_ids):

Review comment:
       I think this is equivalent and simpler (because tasks not in a user 
given `TaskGroup` are in the root `TaskGroup` and have the original `task_id`):
   ```python
           # We need to check if we are in the context of TaskGroup as the 
task_id may
           # already be altered
           task_group = task_group or 
TaskGroupContext.get_current_task_group(dag)
           tg_task_id = task_group.child_id(task_id) if task_group else task_id
   
           if tg_task_id not in dag.task_ids:
               return task_id
           ...
   ``````




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to