jedcunningham commented on code in PR #54569:
URL: https://github.com/apache/airflow/pull/54569#discussion_r2311332255


##########
airflow-core/src/airflow/serialization/serialized_objects.py:
##########
@@ -1752,8 +1929,216 @@ def expand_start_from_trigger(self, *, context: 
Context) -> bool:
         """
         return self.start_from_trigger
 
-    def get_serialized_fields(self):
-        return BaseOperator.get_serialized_fields()
+    @classmethod
+    def get_serialized_fields(cls):
+        """Fields to deserialize from the serialized JSON object."""
+        return frozenset(
+            {
+                "_logger_name",
+                "_needs_expansion",
+                "_task_display_name",
+                "allow_nested_operators",
+                "depends_on_past",
+                "do_xcom_push",
+                "doc",
+                "doc_json",
+                "doc_md",
+                "doc_rst",
+                "doc_yaml",
+                "downstream_task_ids",
+                "email",
+                "email_on_failure",
+                "email_on_retry",
+                "end_date",
+                "execution_timeout",
+                "executor",
+                "executor_config",
+                "ignore_first_depends_on_past",
+                "inlets",
+                "is_setup",
+                "is_teardown",
+                "map_index_template",
+                "max_active_tis_per_dag",
+                "max_active_tis_per_dagrun",
+                "max_retry_delay",
+                "multiple_outputs",
+                "has_on_execute_callback",
+                "has_on_failure_callback",
+                "has_on_retry_callback",
+                "has_on_skipped_callback",
+                "has_on_success_callback",
+                "on_failure_fail_dagrun",
+                "outlets",
+                "owner",
+                "params",
+                "pool",
+                "pool_slots",
+                "priority_weight",
+                "queue",
+                "resources",
+                "retries",
+                "retry_delay",
+                "retry_exponential_backoff",
+                "run_as_user",
+                "start_date",
+                "start_from_trigger",
+                "start_trigger_args",  # NOT PART OF SCHEMA YET

Review Comment:
   It is now yeah?
   
   ```suggestion
                   "start_trigger_args",
   ```



##########
airflow-core/docs/administration-and-deployment/dag-serialization.rst:
##########
@@ -119,3 +119,140 @@ define a ``json`` variable in local Airflow settings 
(``airflow_local_settings.p
 
 See :ref:`Configuring local settings <set-config:configuring-local-settings>` 
for details on how to
 configure local settings.
+
+
+.. _dag-serialization-defaults:
+
+DAG Serialization with Default Values (Airflow 3.1+)
+------------------------------------------------------
+
+Starting with Airflow 3.1, DAG serialization establishes a versioned contract 
between Task SDKs
+and Airflow server components (Scheduler & API-Server). Combined with the Task 
Execution API, this
+decouples client and server components, enabling independent deployments and 
upgrades while maintaining
+backward compatibility and automatic default value resolution.
+
+How Default Values Work
+~~~~~~~~~~~~~~~~~~~~~~~
+
+When Airflow processes DAGs, it applies default values in a specific order of 
precedence for the server:
+
+1. **Schema defaults**: Built-in Airflow defaults (lowest priority)
+2. **Client defaults**: SDK-specific defaults
+3. **DAG default_args**: DAG-level settings (existing behavior)
+4. **Partial arguments**: MappedOperator shared values
+5. **Task values**: Explicit task settings (highest priority)
+
+This means you can set defaults at different levels and more specific settings 
will override
+more general ones.
+
+JSON Structure
+~~~~~~~~~~~~~~
+
+Serialized DAGs now include a ``client_defaults`` section that contains common 
default values:
+
+.. code-block:: json
+
+    {
+      "__version": 2,
+      "client_defaults": {
+        "tasks": {
+          "retry_delay": 300.0,
+          "owner": "data_team"
+        }
+      },
+      "dag": {
+        "dag_id": "example_dag",
+        "default_args": {
+          "retries": 3
+        },
+        "tasks": [{
+          "task_id": "example_task",
+          "task_type": "BashOperator",
+          "_task_module": "airflow.operators.bash",
+          "bash_command": "echo hello",
+          "owner": "specific_owner"
+        }]
+      }
+    }
+
+How Values Are Applied
+~~~~~~~~~~~~~~~~~~~~~~
+
+In the example above, the task ``example_task`` will have these final values:
+
+- **retry_delay**: 300.0 (from client_defaults.tasks)
+- **owner**: "data_team" (from client_defaults.tasks)
+- **retries**: 3 (from dag.default_args, overrides client_defaults)
+- **bash_command**: "echo hello" (explicit task value)
+- **pool**: "default_pool" (from schema defaults)
+
+The system automatically fills in any missing values by walking up the 
hierarchy.
+
+MappedOperator Default Handling
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+MappedOperators (dynamic task mapping) also participate in the default value 
system:
+
+.. code-block:: python
+
+    # DAG Definition
+    BashOperator.partial(task_id="mapped_task", retries=2, 
owner="team_lead").expand(
+        bash_command=["echo 1", "echo 2", "echo 3"]
+    )
+
+In this example, each of the three generated task instances will inherit:
+
+- **retries**: 2 (from partial arguments)
+- **owner**: "team_lead" (from partial arguments)
+- **pool**: "default_pool" (from client_defaults, since not specified in 
partial)
+- **bash_command**: "echo 1", "echo 2", or "echo 3" respectively (from expand)
+
+Independent Deployment Architecture
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+**Decoupled Components:**
+The serialization contract, combined with the Task Execution API, enables 
complete separation between:
+
+- **Server Components** (Scheduler, API-Server, Webserver): Handle 
orchestration, don't run user code

Review Comment:
   ```suggestion
   - **Server Components** (Scheduler, API-Server): Handle orchestration, don't 
run user code
   ```



##########
airflow-core/src/airflow/serialization/schema.json:
##########
@@ -261,58 +261,77 @@
         "template_fields"
       ],
       "properties": {
-        "task_type": { "type": "string" },
+        "task_type": { "type": "string", "default": "BaseOperator"},
         "_task_module": { "type": "string" },
         "_operator_extra_links": { "$ref":  "#/definitions/extra_links" },
         "task_id": { "type": "string" },
-        "task_display_name": { "type": "string" },
-        "label": { "type": "string" },
-        "owner": { "type": "string" },
+        "_task_display_name": { "type": "string" },
+        "owner": { "type": "string", "default": "airflow" },
         "start_date": { "$ref": "#/definitions/datetime" },
         "end_date": { "$ref": "#/definitions/datetime" },
-        "trigger_rule": { "type": "string" },
-        "depends_on_past": { "type": "boolean" },
-        "ignore_first_depends_on_past": { "type": "boolean" },
-        "wait_for_past_depends_before_skipping": { "type": "boolean" },
-        "wait_for_downstream": { "type": "boolean" },
-        "retries": { "type": "number" },
-        "queue": { "type": "string" },
-        "pool": { "type": "string" },
-        "pool_slots": { "type": "number" },
+        "trigger_rule": { "type": "string", "default": "all_success" },
+        "depends_on_past": { "type": "boolean", "default": false },
+        "ignore_first_depends_on_past": { "type": "boolean", "default": false 
},
+        "wait_for_past_depends_before_skipping": { "type": "boolean", 
"default": false },
+        "wait_for_downstream": { "type": "boolean", "default": false },
+        "retries": { "type": "number", "default": 0 },
+        "queue": { "type": "string", "default": "default" },
+        "pool": { "type": "string", "default": "default_pool" },
+        "pool_slots": { "type": "number", "default": 1 },
         "execution_timeout": { "$ref": "#/definitions/timedelta" },
         "retry_delay": { "$ref": "#/definitions/timedelta" },
-        "retry_exponential_backoff": { "type": "boolean" },
+        "retry_exponential_backoff": { "type": "boolean", "default": false },
         "max_retry_delay": { "$ref": "#/definitions/timedelta" },
         "params": { "$ref": "#/definitions/params" },
-        "priority_weight": { "type": "number" },
-        "weight_rule": { "type": "string" },
+        "priority_weight": { "type": "number", "default": 1 },
+        "weight_rule": { "type": "string", "default": "downstream" },
         "executor": { "type": "string" },
-        "executor_config": { "$ref": "#/definitions/dict" },
-        "do_xcom_push": { "type": "boolean" },
-        "ui_color": { "$ref": "#/definitions/color" },
-        "ui_fgcolor": { "$ref": "#/definitions/color" },
+        "executor_config": { "$ref": "#/definitions/dict"},

Review Comment:
   ```suggestion
           "executor_config": { "$ref": "#/definitions/dict" },
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to