This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bb3f74bb09ba05bb574ac5c129f53e833a15994d Author: Shubham Gondane <[email protected]> AuthorDate: Tue Mar 24 03:55:12 2026 -0700 Fix partitioned asset events incorrectly triggering non-partition-aware Dags (#63848) Co-authored-by: Wei Lee <[email protected]> --- airflow-core/src/airflow/assets/manager.py | 2 +- airflow-core/tests/unit/assets/test_manager.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index a599e79018e..dca3db9b181 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -357,7 +357,7 @@ class AssetManager(LoggingMixin): ) non_partitioned_dags = dags_to_queue.difference(partition_dags) # don't double process - if not non_partitioned_dags: + if not non_partitioned_dags or partition_key is not None: return None # Possible race condition: if multiple dags or multiple (usually diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 82477042d86..9ab50bcb3e5 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -294,3 +294,26 @@ class TestAssetManager: queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id)) assert queued_id == "stale_dag" + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_partitioned_asset_event_does_not_trigger_non_partitioned_dag(self, session, mock_task_instance): + """partitioned asset events (events with partition key) must not queue non-partition-aware Dags.""" + asm = AssetModel(uri="test://asset/", name="test_asset", group="asset") + session.add(asm) + dag = DagModel( + dag_id="consumer_dag", is_paused=False, bundle_name="testing", timetable_partitioned=False + ) + session.add(dag) + asm.scheduled_dags = [DagScheduleAssetReference(dag_id=dag.dag_id)] + session.execute(delete(AssetDagRunQueue)) + session.flush() + + AssetManager.register_asset_change( + task_instance=mock_task_instance, + asset=Asset(uri="test://asset/", name="test_asset"), + session=session, + partition_key="2024-01-01T00:00:00+00:00", + ) + session.flush() + + assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0
