This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d800a0de51 Combine similar if logics in core (#33988)
d800a0de51 is described below
commit d800a0de5194bb1ef3cfad44c874abafcc78efd6
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Sep 9 17:48:52 2023 +0200
Combine similar if logics in core (#33988)
* Combine similar if logics in core
* Update airflow/models/baseoperator.py
Co-authored-by: Tzu-ping Chung <[email protected]>
* replace in tuple by multiple or equalities
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/models/baseoperator.py | 4 +---
airflow/serialization/serialized_objects.py | 8 +++++---
airflow/ti_deps/deps/trigger_rule_dep.py | 15 +--------------
3 files changed, 7 insertions(+), 20 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index a7414397ea..3aa67c8eca 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1107,9 +1107,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
if self.__from_mapped:
pass # Don't add to DAG -- the mapped task takes the place.
- elif self.task_id not in dag.task_dict:
- dag.add_task(self)
- elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is
not self:
+ elif dag.task_dict.get(self.task_id) is not self:
dag.add_task(self)
self._dag = dag
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index fa8e638d80..38f5515111 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1000,9 +1000,11 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
v = {arg: cls.deserialize(value) for arg, value in v.items()}
elif k in {"expand_input", "op_kwargs_expand_input"}:
v = _ExpandInputRef(v["type"], cls.deserialize(v["value"]))
- elif k in cls._decorated_fields or k not in
op.get_serialized_fields():
- v = cls.deserialize(v)
- elif k in ("outlets", "inlets"):
+ elif (
+ k in cls._decorated_fields
+ or k not in op.get_serialized_fields()
+ or k in ("outlets", "inlets")
+ ):
v = cls.deserialize(v)
elif k == "on_failure_fail_dagrun":
k = "_on_failure_fail_dagrun"
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index c3dfb71e6c..7bb4bf5213 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -471,20 +471,7 @@ class TriggerRuleDep(BaseTIDep):
f"upstream_task_ids={task.upstream_task_ids}"
)
)
- elif trigger_rule == TR.NONE_FAILED:
- num_failures = upstream - success - skipped
- if ti.map_index > -1:
- num_failures -= removed
- if num_failures > 0:
- yield self._failing_status(
- reason=(
- f"Task's trigger rule '{trigger_rule}' requires
all upstream tasks to have "
- f"succeeded or been skipped, but found
{num_failures} non-success(es). "
- f"upstream_states={upstream_states}, "
- f"upstream_task_ids={task.upstream_task_ids}"
- )
- )
- elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
+ elif trigger_rule == TR.NONE_FAILED or trigger_rule ==
TR.NONE_FAILED_MIN_ONE_SUCCESS:
num_failures = upstream - success - skipped
if ti.map_index > -1:
num_failures -= removed