ashb commented on code in PR #57952:
URL: https://github.com/apache/airflow/pull/57952#discussion_r2510473033


##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1360,6 +1377,37 @@ def test(
         return dr
 
 
+def _prepare_next_kwargs(*, event: TriggerEvent | None, defer_message: 
DeferTask) -> dict[str, object] | None:
+    from airflow.serialization.serialized_objects import BaseSerialization
+
+    if event is not None:
+        return {"event": event.payload}
+
+    raw = defer_message.next_kwargs
+    if raw is None:
+        return None
+
+    serialized: object
+    if isinstance(raw, str):
+        try:
+            serialized = json.loads(raw)
+        except json.JSONDecodeError as err:
+            raise ValueError("Deferred next_kwargs must be a JSON object when 
provided as a string.") from err
+    else:
+        serialized = raw
+
+    if isinstance(serialized, dict) and "__type" in serialized and "__var" in 
serialized:
+        deserialized = BaseSerialization.deserialize(serialized)
+        if not isinstance(deserialized, dict):
+            raise TypeError("Deferred next_kwargs must deserialize to a 
mapping.")
+        return deserialized
+
+    if isinstance(serialized, dict):
+        return dict(serialized)
+
+    raise TypeError("Deferred next_kwargs must be provided as a mapping.")

Review Comment:
   This does a lot more than it used to -- this feels out of scope to a "mypy" 
validation on the surface.



##########
task-sdk/tests/task_sdk/definitions/test_dag.py:
##########
@@ -395,6 +406,83 @@ def execute(self, context):
             fail_fast_dag.add_task(task_with_non_default_trigger_rule)
 
 
+class TestDagHelperUtilities:
+    def _make_defer_task(self, **overrides: Any) -> DeferTask:
+        defaults: dict[str, Any] = {
+            "classpath": "airflow.triggers.base.Trigger",
+            "next_method": "resume",
+        }
+        defaults.update(overrides)
+        return DeferTask(**defaults)
+
+    def test_prepare_next_kwargs_prefers_event_payload(self):
+        event_payload = {"value": "from_event"}
+        event = TriggerEvent(payload=event_payload)
+        message = self._make_defer_task()
+
+        result = _prepare_next_kwargs(event=event, defer_message=message)
+
+        assert result == {"event": event_payload}
+
+    def test_prepare_next_kwargs_from_plain_dict_returns_copy(self):
+        mapping = {"value": "from_message"}
+        message = self._make_defer_task(next_kwargs=mapping)
+
+        result = _prepare_next_kwargs(event=None, defer_message=message)
+
+        assert result == mapping
+        assert result is not mapping
+
+    def test_prepare_next_kwargs_from_json_string(self):
+        payload = {"value": "json"}
+        message = self._make_defer_task(next_kwargs=json.dumps(payload))
+
+        result = _prepare_next_kwargs(event=None, defer_message=message)
+
+        assert result == payload
+
+    def test_prepare_next_kwargs_from_serialized_mapping(self):
+        encoded = BaseSerialization.serialize({"value": "serialized"})
+        message = self._make_defer_task(next_kwargs=encoded)
+
+        result = _prepare_next_kwargs(event=None, defer_message=message)
+
+        assert result == {"value": "serialized"}
+
+    def test_prepare_next_kwargs_rejects_non_mapping_json(self):
+        message = self._make_defer_task(next_kwargs=json.dumps([1, 2, 3]))
+
+        with pytest.raises(TypeError, match="Deferred next_kwargs must be 
provided as a mapping"):
+            _prepare_next_kwargs(event=None, defer_message=message)

Review Comment:
   These sort of tests are over-fitting the specific implementation.
   
   We shouldn't be testing the helper method internal to the how the dag.py 
calls them, but testing the public interface raises an error etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to