This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5357677bf4bdd97da47b5a1d70207952994917df
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 29 20:07:11 2026 +0530
[v3-2-test] Cache BaseOperator signature in OperatorSerialization (#67701)
(#67708)
* [v3-2-test] Cache BaseOperator signature in OperatorSerialization (#67701)
(cherry picked from commit 5799f9d672b80d01556b70eace88238d7b61665c)
Co-authored-by: Shahar Epstein <[email protected]>
* Fix serialization unit tests to avoid ORM side effects in --skip-db mode
(#65206)
(cherry picked from commit 2007bff00b14fd82dff08ce78649681e9a867d1b)
---------
Co-authored-by: Shahar Epstein <[email protected]>
Co-authored-by: Pavel Grakhov <[email protected]>
---
.../airflow/serialization/serialized_objects.py | 10 ++--
.../unit/serialization/test_serialized_objects.py | 54 +++++++++-------------
2 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index a0295dc2370..351a0b6cf42 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -966,6 +966,12 @@ class OperatorSerialization(DAGNode, BaseSerialization):
_const_fields: ClassVar[set[str] | None] = None
+ # Parameters of BaseOperator.__init__ that must not appear in
template_fields.
+ # Computed once at class-load time: the signature never changes during a
process.
+ _FORBIDDEN_TEMPLATE_FIELDS: ClassVar[frozenset[str]] = frozenset(
+ signature(BaseOperator.__init__).parameters
+ ) - {"email"}
+
@classmethod
def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
serialized_op = cls._serialize_node(op)
@@ -1046,9 +1052,7 @@ class OperatorSerialization(DAGNode, BaseSerialization):
# Store all template_fields as they are if there are JSON Serializable
# If not, store them as strings
# And raise an exception if the field is not templateable
- forbidden_fields =
set(signature(BaseOperator.__init__).parameters.keys())
- # Though allow some of the BaseOperator fields to be templated anyway
- forbidden_fields.difference_update({"email"})
+ forbidden_fields = cls._FORBIDDEN_TEMPLATE_FIELDS
if op.template_fields:
for template_field in op.template_fields:
if template_field in forbidden_fields:
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index c117561273b..06754923137 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -32,6 +32,7 @@ from pendulum.tz.timezone import FixedTimezone, Timezone
from uuid6 import uuid7
from airflow._shared.timezones import timezone
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as
ti_datamodel
from airflow.callbacks.callback_requests import DagCallbackRequest,
TaskCallbackRequest
from airflow.exceptions import (
AirflowException,
@@ -42,8 +43,6 @@ from airflow.exceptions import (
)
from airflow.models.connection import Connection
from airflow.models.dag import DAG
-from airflow.models.dagrun import DagRun
-from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom_arg import XComArg
from airflow.partition_mappers.identity import IdentityMapper as
CoreIdentityMapper
from airflow.partition_mappers.temporal import (
@@ -104,12 +103,9 @@ from airflow.serialization.serialized_objects import (
DagSerialization,
LazyDeserializedDAG,
_has_kubernetes,
- create_scheduler_operator,
)
from airflow.triggers.base import BaseTrigger
from airflow.utils.db import LazySelectSequence
-from airflow.utils.state import DagRunState, State
-from airflow.utils.types import DagRunType
from unit.models import DEFAULT_DATE
@@ -225,32 +221,15 @@ def test_serde_validate_schema_valid_json():
assert t.obj == {"foo": "bar"}
-TI = TaskInstance(
- task=create_scheduler_operator(EmptyOperator(task_id="test-task")),
+TASK_CALLBACK_TI = ti_datamodel.TaskInstance(
+ id=uuid7(),
+ task_id="test-task",
+ dag_id="test-dag",
run_id="fake_run",
- state=State.RUNNING,
+ try_number=1,
dag_version_id=uuid7(),
)
-TI_WITH_START_DAY = TaskInstance(
- task=create_scheduler_operator(EmptyOperator(task_id="test-task")),
- run_id="fake_run",
- state=State.RUNNING,
- dag_version_id=uuid7(),
-)
-TI_WITH_START_DAY.start_date = timezone.datetime(2020, 1, 1, 0, 0, 0)
-
-DAG_RUN = DagRun(
- dag_id="test_dag_id",
- run_id="test_dag_run_id",
- run_type=DagRunType.MANUAL,
- logical_date=timezone.utcnow(),
- start_date=timezone.utcnow(),
- state=DagRunState.SUCCESS,
-)
-DAG_RUN.id = 1
-
-
# we add the tasks out of order, to ensure they are deserialized in the
correct order
DAG_WITH_TASKS = DAG(dag_id="test_dag", start_date=datetime.now())
EmptyOperator(task_id="task2", dag=DAG_WITH_TASKS)
@@ -400,15 +379,10 @@ class MockLazySelectSequence(LazySelectSequence):
DAT.ASSET,
equal_serialized_asset,
),
- (
- Connection(conn_id="TEST_ID", uri="mysql://"),
- DAT.CONNECTION,
- lambda a, b: a.get_uri() == b.get_uri(),
- ),
(
TaskCallbackRequest(
filepath="filepath",
- ti=TI,
+ ti=TASK_CALLBACK_TI,
bundle_name="testing",
bundle_version=None,
),
@@ -504,6 +478,19 @@ def test_serialize_deserialize(input, encoded_type,
cmp_func):
json.dumps(serialized) # does not raise
[email protected]_test
+def test_serialize_deserialize_connection():
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ connection = Connection(conn_id="TEST_ID", uri="mysql://")
+ serialized = BaseSerialization.serialize(connection)
+ json.dumps(serialized)
+ assert serialized[Encoding.TYPE] == DAT.CONNECTION
+
+ deserialized = BaseSerialization.deserialize(serialized)
+ assert deserialized.get_uri() == connection.get_uri()
+
+
@pytest.mark.parametrize("reference", REFERENCE_TYPES)
def test_serialize_deserialize_deadline_alert(reference):
public_deadline_alert_fields = {
@@ -541,6 +528,7 @@ def test_serialize_deserialize_deadline_alert(reference):
),
],
)
[email protected]_test
def test_backcompat_deserialize_connection(conn_uri):
"""Test deserialize connection which serialised by previous serializer
implementation."""
from airflow.serialization.serialized_objects import BaseSerialization