henry3260 commented on code in PR #59104:
URL: https://github.com/apache/airflow/pull/59104#discussion_r2649331069


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1896,6 +1896,32 @@ def xcom_pull(
             ).first()
             if first is None:  # No matching XCom at all.
                 return default
+
+            # Check if the task is actually mapped
+            target_task_id = task_ids if isinstance(task_ids, str) else 
first.task_id
+            is_actually_mapped = False
+
+            try:
+                # Get the task definition from the DAG
+                if self.task and self.task.dag:
+                    dag = self.task.dag
+                    if dag.has_task(target_task_id):
+                        target_task = dag.task_dict[target_task_id]
+                        is_actually_mapped = getattr(target_task, "is_mapped", 
False)

Review Comment:
   > It's better to use the `is_mapped` function defined in `mappedoperator.py`.
   
   I agree that using the `is_mapped()` function is the better approach 
architecturally. 
   
   However, I'm currently facing a module import issue in the CI/CD environment:
   
   **The Problem:**
   - CI tests fail with: `ModuleNotFoundError: No module named 
'airflow.models.mappedoperator'`
   - Affected tests: `test_xcom_push_flag`, `test_xcom_pull_unmapped_task`, 
`test_xcom_pull_mapped_task`
   
   **Root Cause:**
   The `airflow.models.mappedoperator` module imports from 
`airflow.sdk.definitions.mappedoperator`, but the `task-sdk` package is not 
properly installed/accessible in the CI test environment.
   
   **Proposed Solution:**
   Define a local `is_mapped()` helper function in `taskinstance.py` to avoid 
the import issue:
   
   ```python
   def _is_mapped(obj) -> bool:
       """Check if an operator is mapped. Local helper to avoid import 
issues."""
       return getattr(obj, 'is_mapped', False)
   ```
   Or just keep the current implementation for now. 



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1896,6 +1896,32 @@ def xcom_pull(
             ).first()
             if first is None:  # No matching XCom at all.
                 return default
+
+            # Check if the task is actually mapped
+            target_task_id = task_ids if isinstance(task_ids, str) else 
first.task_id
+            is_actually_mapped = False
+
+            try:
+                # Get the task definition from the DAG
+                if self.task and self.task.dag:
+                    dag = self.task.dag
+                    if dag.has_task(target_task_id):
+                        target_task = dag.task_dict[target_task_id]
+                        is_actually_mapped = getattr(target_task, "is_mapped", 
False)

Review Comment:
   > > It's better to use the `is_mapped` function defined in 
`mappedoperator.py`.
   > 
   > I agree that using the `is_mapped()` function is the better approach 
architecturally.
   > 
   > However, I'm currently facing a module import issue in the CI/CD 
environment:
   > 
   > **The Problem:**
   > 
   > * CI tests fail with: `ModuleNotFoundError: No module named 
'airflow.models.mappedoperator'`
   > * Affected tests: `test_xcom_push_flag`, `test_xcom_pull_unmapped_task`, 
`test_xcom_pull_mapped_task`
   > 
   > **Root Cause:** The `airflow.models.mappedoperator` module imports from 
`airflow.sdk.definitions.mappedoperator`, but the `task-sdk` package is not 
properly installed/accessible in the CI test environment.
   > 
   > **Proposed Solution:** Define a local `is_mapped()` helper function in 
`taskinstance.py` to avoid the import issue:
   > 
   > ```python
   > def _is_mapped(obj) -> bool:
   >     """Check if an operator is mapped. Local helper to avoid import 
issues."""
   >     return getattr(obj, 'is_mapped', False)
   > ```
   > 
   > Or just keep the current implementation for now.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to