uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r780938792



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -623,7 +647,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) 
-> BaseOperator:
 
         for k, v in encoded_op.items():
 
-            if k == "_downstream_task_ids":
+            if k in {"_downstream_task_ids", "downstream_task_ids"}:

Review comment:
       Maybe we should rename `downstream_task_ids` to `_downstream_task_ids` 
instead? This feels a bit awkward.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1741,6 +1788,28 @@ def leaves(self) -> List["MappedOperator"]:
     def has_dag(self):
         return self.dag is not None
 
+    @property
+    def inherits_from_dummy_operator(self):
+        """Used to determine if an Operator is inherited from DummyOperator"""
+        # This looks like `isinstance(self, DummyOperator) would work, but 
this also
+        # needs to cope when `self` is a Serialized instance of a 
DummyOperator or one
+        # of its sub-classes (which don't inherit from anything but 
BaseOperator).
+        return getattr(self, '_is_dummy', False)

Review comment:
       `_is_dummy` has a default, so when can it be unset?

##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, 
**kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support 
task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       I don’t get what this implies.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1663,24 +1664,64 @@ def _validate_kwarg_names_for_mapping(cls: 
Type[BaseOperator], func_name: str, v
         raise TypeError(f'{cls.__name__}.{func_name} got unexpected keyword 
arguments {names}')
 
 
[email protected](kw_only=True)
+def _MappedOperator_minimal_repr(cls, fields):
+    results = []
+    fields = iter(fields)
+    for field in fields:
+        results.append(field)
+        if field.name == "dag":
+            # Everything after 'dag' attribute is exluced form repr
+            break
+
+    for field in fields:
+        results.append(field.evolve(repr=False))
+    return results

Review comment:
       This feels much too magical to me 🙁 Why not add `repr=False` explicitly 
instead, or implement a custom `__repr__`?

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) 
-> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)
+            else:
+                setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - 
cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            if not hasattr(op, k):
+                setattr(op, k, None)

Review comment:
       Same question for this.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) 
-> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       What does this cover? Too hacky IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to