This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f43bc4a5f4 partial kwargs deserialized MappedOperator set on unmapped 
Operator (#42563)
3f43bc4a5f4 is described below

commit 3f43bc4a5f4b033d1562e32d78801a6406761e47
Author: Fred Thomsen <[email protected]>
AuthorDate: Mon Nov 18 02:08:44 2024 -0500

    partial kwargs deserialized MappedOperator set on unmapped Operator (#42563)
    
    Forwarding the partial kwargs to the underlying operator is done in
    the standard (non-serialization) case and thus it should
    be done here as well for things in the webserver that rely on these
    fixed attributes.
    
    An example is the `Triggered DAG` link for the `TriggerDagRunOperator`.
---
 airflow/models/mappedoperator.py              | 2 ++
 tests/serialization/test_dag_serialization.py | 6 +++++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 925acfc16f0..9a5c1b563ce 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -821,6 +821,8 @@ class MappedOperator(AbstractOperator):
         from airflow.serialization.serialized_objects import 
SerializedBaseOperator
 
         op = SerializedBaseOperator(task_id=self.task_id, params=self.params, 
_airflow_from_mapped=True)
+        for partial_attr, value in self.partial_kwargs.items():
+            setattr(op, partial_attr, value)
         SerializedBaseOperator.populate_operator(op, self.operator_class)
         if self.dag is not None:  # For Mypy; we only serialize tasks in a DAG 
so the check always satisfies.
             SerializedBaseOperator.set_task_dag_references(op, self.dag)
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index d10984556ff..f04ff3e2568 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2529,7 +2529,11 @@ def test_operator_expand_deserialized_unmap():
     ser_normal = BaseSerialization.serialize(normal)
     deser_normal = BaseSerialization.deserialize(ser_normal)
     deser_normal.dag = None
-    assert deser_mapped.unmap(None) == deser_normal
+    unmapped_deser_mapped = deser_mapped.unmap(None)
+
+    assert type(unmapped_deser_mapped) is type(deser_normal) is 
SerializedBaseOperator
+    assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a"
+    assert unmapped_deser_mapped.executor_config == 
deser_normal.executor_config == {"a": "b"}
 
 
 @pytest.mark.db_test

Reply via email to