Copilot commented on code in PR #64626:
URL: https://github.com/apache/airflow/pull/64626#discussion_r3029884029
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -210,6 +209,8 @@ def bulk_fetch(cls, ids: Iterable[int], session: Session =
NEW_SESSION) -> dict[
@provide_session
def fetch_trigger_ids_with_non_task_associations(cls, session: Session =
NEW_SESSION) -> set[str]:
"""Fetch all trigger IDs actively associated with non-task entities
like assets and callbacks."""
+ from airflow.models.callback import Callback
+
query = select(AssetWatcherModel.trigger_id).union_all(
Review Comment:
New import inside the method body. In Airflow core we generally keep imports
at module scope; local imports are only used when necessary (e.g., to avoid a
proven circular import) and should be documented like the existing `Job` import
in `assign_unassigned()`.
If there isn’t a circular import, move `Callback` to a top-level import
(preferably `from airflow.models.callback import Callback`); otherwise, add a
short comment explaining why this must remain a local import.
##########
airflow-core/tests/unit/serialization/test_encoders.py:
##########
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from sqlalchemy import delete
+
+from airflow.models.trigger import Trigger
+from airflow.providers.standard.triggers.file import FileDeleteTrigger
+from airflow.serialization.encoders import encode_trigger
+from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
+from airflow.triggers.base import BaseEventTrigger
+
+pytest.importorskip("airflow.providers.apache.kafka")
+from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
+
Review Comment:
`pytest.importorskip("airflow.providers.apache.kafka")` at module import
time will skip this entire test module when the Kafka provider isn't installed,
including the `FileDeleteTrigger` coverage (which should be
provider-independent) and the DB hash consistency checks.
Consider moving the `importorskip`/Kafka import into only the Kafka-specific
parametrized cases (e.g., conditionally add Kafka params, or mark those params
with skip) so the non-Kafka assertions still run in minimal test environments.
##########
airflow-core/src/airflow/serialization/encoders.py:
##########
@@ -162,6 +162,14 @@ def _ensure_serialized(d):
if isinstance(trigger, dict):
classpath = trigger["classpath"]
kwargs = trigger["kwargs"]
+ # unwrap any kwargs that are themselves serialized objects, to avoid
double-serialization in the trigger's own serialize() method.
+ unwrapped = {}
+ for k, v in kwargs.items():
+ if isinstance(v, dict) and Encoding.TYPE in v:
+ unwrapped[k] = BaseSerialization.deserialize(v)
+ else:
Review Comment:
The new unwrapping logic deserializes any kwarg value that is a dict
containing `Encoding.TYPE`, but `BaseSerialization.deserialize()` expects both
`Encoding.TYPE` and `Encoding.VAR` keys. If a trigger kwarg contains a regular
dict with a `__type` key (common pattern) but no `__var`, this will raise
`KeyError` during `encode_trigger()`.
To avoid crashing DAG processing/scheduling, tighten the sentinel check to
only treat dicts as BaseSerialization payloads when *both* `Encoding.TYPE` and
`Encoding.VAR` are present (and consider updating `_ensure_serialized()`
similarly so both directions use the same predicate).
##########
airflow-core/tests/unit/dag_processing/test_collection.py:
##########
@@ -182,6 +182,94 @@ def test_add_asset_trigger_references(
asset_model = session.scalars(select(AssetModel)).one()
assert len(asset_model.triggers) == expected_num_triggers
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_add_asset_trigger_references_hash_consistency(self, dag_maker,
session):
+ """Trigger hash from the DAG-parsed path must equal the hash computed
+ from the DB-stored Trigger row. A mismatch causes the scheduler to
+ recreate trigger rows on every heartbeat.
+ """
+ from airflow.models.trigger import Trigger
+ from airflow.serialization.encoders import encode_trigger
+ from airflow.triggers.base import BaseEventTrigger
+
+ trigger = FileDeleteTrigger(filepath="/tmp/test.txt",
poke_interval=5.0)
+ asset = Asset(
+ "test_hash_consistency_asset",
+ watchers=[AssetWatcher(name="file_watcher", trigger=trigger)],
+ )
+
+ with dag_maker(dag_id="test_hash_consistency_dag", schedule=[asset])
as dag:
+ EmptyOperator(task_id="mytask")
+
+ dags = {dag.dag_id: LazyDeserializedDAG.from_dag(dag)}
+ orm_dags = DagModelOperation(dags, "testing",
None).add_dags(session=session)
+ orm_dags[dag.dag_id].is_paused = False
+
+ asset_op = AssetModelOperation.collect(dags)
+ orm_assets = asset_op.sync_assets(session=session)
+ session.flush()
+
+ asset_op.add_dag_asset_references(orm_dags, orm_assets,
session=session)
+ asset_op.activate_assets_if_possible(orm_assets.values(),
session=session)
+ asset_op.add_asset_trigger_references(orm_assets, session=session)
+ session.flush()
+
+ # DAG-side hash (same computation as add_asset_trigger_references line
1025)
+ encoded = encode_trigger(trigger)
+ dag_hash = BaseEventTrigger.hash(encoded["classpath"],
encoded["kwargs"])
+
+ # DB-side: expire and re-load the Trigger row to force a real DB read
+ asset_model = session.scalars(select(AssetModel)).one()
+ assert len(asset_model.triggers) == 1
+ orm_trigger = asset_model.triggers[0]
+ trigger_id = orm_trigger.id
+ session.expire(orm_trigger)
+ reloaded = session.get(Trigger, trigger_id)
+
+ # DB-side hash (same computation as add_asset_trigger_references line
1033)
+ db_hash = BaseEventTrigger.hash(reloaded.classpath, reloaded.kwargs)
+
+ assert dag_hash == db_hash
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_add_asset_trigger_references_idempotent(self, dag_maker, session):
+ """Calling add_asset_trigger_references twice with the same trigger
+ must not create duplicate rows.
+ """
+ from airflow.models.trigger import Trigger
+
+ trigger = FileDeleteTrigger(filepath="/tmp/test.txt",
poke_interval=5.0)
Review Comment:
New inline import in the test body; since this module already imports most
dependencies at the top, please move `Trigger` to module scope for
consistency/readability (unless there’s a specific reason to keep it local).
##########
airflow-core/tests/unit/dag_processing/test_collection.py:
##########
@@ -182,6 +182,94 @@ def test_add_asset_trigger_references(
asset_model = session.scalars(select(AssetModel)).one()
assert len(asset_model.triggers) == expected_num_triggers
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_add_asset_trigger_references_hash_consistency(self, dag_maker,
session):
+ """Trigger hash from the DAG-parsed path must equal the hash computed
+ from the DB-stored Trigger row. A mismatch causes the scheduler to
+ recreate trigger rows on every heartbeat.
+ """
+ from airflow.models.trigger import Trigger
+ from airflow.serialization.encoders import encode_trigger
+ from airflow.triggers.base import BaseEventTrigger
+
Review Comment:
These new inline imports are unnecessary here since the file already has
module-level imports, and they make it harder to see dependencies/trigger lint
rules. Please move `Trigger`, `encode_trigger`, and `BaseEventTrigger` imports
to the top of the module (or otherwise document why they must remain local).
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -408,6 +409,8 @@ def get_sorted_triggers(
:param queues: The optional set of trigger queues to filter triggers
by.
:param session: The database session.
"""
+ from airflow.models.callback import Callback
+
Review Comment:
Same as above: this new `Callback` import is inside the method body without
a circular-import justification. If a local import is required here, please add
a brief comment stating the reason; otherwise move the import to module scope
to match the prevailing style in this module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]