This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d800a0de51 Combine similar if logics in core (#33988)
d800a0de51 is described below

commit d800a0de5194bb1ef3cfad44c874abafcc78efd6
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Sep 9 17:48:52 2023 +0200

    Combine similar if logics in core (#33988)
    
    * Combine similar if logics in core
    
    * Update airflow/models/baseoperator.py
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    * replace in tuple by multiple or equalities
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/models/baseoperator.py              |  4 +---
 airflow/serialization/serialized_objects.py |  8 +++++---
 airflow/ti_deps/deps/trigger_rule_dep.py    | 15 +--------------
 3 files changed, 7 insertions(+), 20 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index a7414397ea..3aa67c8eca 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1107,9 +1107,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
         if self.__from_mapped:
             pass  # Don't add to DAG -- the mapped task takes the place.
-        elif self.task_id not in dag.task_dict:
-            dag.add_task(self)
-        elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is 
not self:
+        elif dag.task_dict.get(self.task_id) is not self:
             dag.add_task(self)
 
         self._dag = dag
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index fa8e638d80..38f5515111 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1000,9 +1000,11 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
                 v = {arg: cls.deserialize(value) for arg, value in v.items()}
             elif k in {"expand_input", "op_kwargs_expand_input"}:
                 v = _ExpandInputRef(v["type"], cls.deserialize(v["value"]))
-            elif k in cls._decorated_fields or k not in 
op.get_serialized_fields():
-                v = cls.deserialize(v)
-            elif k in ("outlets", "inlets"):
+            elif (
+                k in cls._decorated_fields
+                or k not in op.get_serialized_fields()
+                or k in ("outlets", "inlets")
+            ):
                 v = cls.deserialize(v)
             elif k == "on_failure_fail_dagrun":
                 k = "_on_failure_fail_dagrun"
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow/ti_deps/deps/trigger_rule_dep.py
index c3dfb71e6c..7bb4bf5213 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -471,20 +471,7 @@ class TriggerRuleDep(BaseTIDep):
                             f"upstream_task_ids={task.upstream_task_ids}"
                         )
                     )
-            elif trigger_rule == TR.NONE_FAILED:
-                num_failures = upstream - success - skipped
-                if ti.map_index > -1:
-                    num_failures -= removed
-                if num_failures > 0:
-                    yield self._failing_status(
-                        reason=(
-                            f"Task's trigger rule '{trigger_rule}' requires 
all upstream tasks to have "
-                            f"succeeded or been skipped, but found 
{num_failures} non-success(es). "
-                            f"upstream_states={upstream_states}, "
-                            f"upstream_task_ids={task.upstream_task_ids}"
-                        )
-                    )
-            elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
+            elif trigger_rule == TR.NONE_FAILED or trigger_rule == 
TR.NONE_FAILED_MIN_ONE_SUCCESS:
                 num_failures = upstream - success - skipped
                 if ti.map_index > -1:
                     num_failures -= removed

Reply via email to