amoghrajesh commented on code in PR #57853:
URL: https://github.com/apache/airflow/pull/57853#discussion_r2517165599


##########
airflow-core/src/airflow/serialization/serialized_objects.py:
##########
@@ -1862,10 +1880,37 @@ def _is_excluded(cls, var: Any, attrname: str, op: 
DAGNode):
         # Check if value matches client_defaults (hierarchical defaults 
optimization)
         if cls._matches_client_defaults(var, attrname):
             return True
-        schema_defaults = cls.get_schema_defaults("operator")
 
+        # for const fields, we should always be excluded when False, 
regardless of client_defaults
+        const_fields = cls.get_operator_const_fields()
+        if attrname in const_fields and var is False:
+            return True
+
+        schema_defaults = cls.get_schema_defaults("operator")
         if attrname in schema_defaults:
             if schema_defaults[attrname] == var:
+                # If it also matches client_defaults, exclude (optimization)
+                client_defaults = cls.generate_client_defaults()
+                if attrname in client_defaults and client_defaults[attrname] 
== var:
+                    return True
+
+                # If client_defaults differs, preserve explicit override from 
user
+                # Example: default_args={"retries": 0}, schema default=0, 
client_defaults={"retries": 3}
+                if attrname in client_defaults and client_defaults[attrname] 
!= var:
+                    if op.has_dag():
+                        dag = op.dag
+                        if dag and attrname in dag.default_args and 
dag.default_args[attrname] == var:
+                            return False
+                    if (
+                        hasattr(op, "_BaseOperator__init_kwargs")
+                        and attrname in op._BaseOperator__init_kwargs
+                        and op._BaseOperator__init_kwargs[attrname] == var
+                    ):
+                        return False

Review Comment:
   @Lee-W I do not entirely follow this nit? What are we trying to do?



##########
airflow-core/tests/unit/serialization/test_dag_serialization.py:
##########
@@ -113,15 +113,24 @@ def operator_defaults(overrides):
         with operator_defaults({"retries": 2, "retry_delay": 200.0}):
             # Test code with modified operator defaults
     """
+    import airflow.sdk.definitions._internal.abstractoperator as 
abstract_op_module
     from airflow.sdk.bases.operator import OPERATOR_DEFAULTS
 
     original_values = {}
+    original_module_constants = {}
     try:
         # Store original values and apply overrides
         for key, value in overrides.items():
             original_values[key] = OPERATOR_DEFAULTS.get(key)
             OPERATOR_DEFAULTS[key] = value
 
+            # Also patch module-level constants like DEFAULT_RETRIES
+            # These are frozen at import time and used as default parameter 
values
+            const_name = f"DEFAULT_{key.upper()}"
+            if hasattr(abstract_op_module, const_name):
+                original_module_constants[const_name] = 
getattr(abstract_op_module, const_name)
+                setattr(abstract_op_module, const_name, value)
+

Review Comment:
   Added in last commit, good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to