Copilot commented on code in PR #64297:
URL: https://github.com/apache/airflow/pull/64297#discussion_r3025328542
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ print("This is a custom operator")
Review Comment:
The same `CustomOperator` test helper class is re-defined in multiple new
tests in this hunk. To reduce duplication and make future edits easier,
consider defining it once (e.g., as a local helper near the test class/module
scope) or reuse an existing operator/test helper if one already exists in this
test module.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ print("This is a custom operator")
+
+ task = CustomOperator(task_id="pull_task")
+ runtime_ti = create_runtime_ti(task=task)
+
+ with patch.object(XCom, "get_all", return_value=None) as mock_get_all:
+ result = runtime_ti.xcom_pull(key="key", task_ids=task_ids,
default=default)
+ assert result == expected_default
+ assert mock_get_all.called
+
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_result"),
+ [
+ pytest.param(
+ "task_a",
+ "fallback",
+ [],
+ id="single_task_empty_list_returns_empty",
+ ),
+ pytest.param(
+ ["task_a"],
+ "fallback",
+ [],
+ id="list_single_task_empty_list_returns_empty",
+ ),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ [],
+ id="multiple_tasks_empty_list_returns_empty",
+ ),
+ ],
+ )
Review Comment:
These expectations appear to encode that an empty list from `XCom.get_all()`
means `xcom_pull()` should return `[]` (even when `default` is provided). If an
empty list represents 'no matching XComs', this should instead return the
provided `default` (similar to the `None` case), otherwise it can produce
inconsistent output shapes and can break invariants (e.g., result length not
matching `task_ids` when pulling multiple tasks). Consider adjusting the
implementation to treat `[]` like 'not found' (same as `None`) and update these
expected results accordingly.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ print("This is a custom operator")
+
+ task = CustomOperator(task_id="pull_task")
+ runtime_ti = create_runtime_ti(task=task)
+
+ with patch.object(XCom, "get_all", return_value=None) as mock_get_all:
+ result = runtime_ti.xcom_pull(key="key", task_ids=task_ids,
default=default)
Review Comment:
These tests rely on the implicit default `map_indexes` value to exercise the
`map_indexes=NOTSET` branch. To make the intent resilient to future
signature/default changes, consider passing `map_indexes=NOTSET` explicitly in
the `xcom_pull(...)` calls for the new coverage.
```suggestion
result = runtime_ti.xcom_pull(
key="key",
task_ids=task_ids,
default=default,
map_indexes=NOTSET,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]