This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 68afaa6e59e fix(Asset-Partition): use timetable_partitioned to
distinguish whether a Dag is partitioned (#62864)
68afaa6e59e is described below
commit 68afaa6e59e180c16489527b4ffcc6979c75985d
Author: Wei Lee <[email protected]>
AuthorDate: Mon Mar 9 21:49:16 2026 +0800
fix(Asset-Partition): use timetable_partitioned to distinguish whether a
Dag is partitioned (#62864)
---
airflow-core/src/airflow/assets/manager.py | 9 ++-------
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +-
2 files changed, 3 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 9c778d4a50e..9c287209172 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -348,10 +348,7 @@ class AssetManager(LoggingMixin):
if not dags_to_queue:
return None
- # TODO: AIP-76 there may be a better way to identify that timetable is
partition-driven
- # https://github.com/apache/airflow/issues/58445
- partition_dags = [x for x in dags_to_queue if x.timetable_summary ==
"Partitioned Asset"]
-
+ partition_dags = [x for x in dags_to_queue if x.timetable_partitioned
is True]
cls._queue_partitioned_dags(
asset_id=asset_id,
partition_dags=partition_dags,
@@ -361,7 +358,6 @@ class AssetManager(LoggingMixin):
)
non_partitioned_dags = dags_to_queue.difference(partition_dags) #
don't double process
-
if not non_partitioned_dags:
return None
@@ -388,9 +384,8 @@ class AssetManager(LoggingMixin):
) -> None:
if partition_dags and not partition_key:
# TODO: AIP-76 how to best ensure users can see this? Probably add
Log record.
- # https://github.com/apache/airflow/issues/59060
log.warning(
- "Listening dags are partition-aware but run has no partition
key",
+ "Listening Dags are partition-aware but run has no partition
key",
listening_dags=[x.dag_id for x in partition_dags],
asset_id=asset_id,
run_id=event.source_run_id,
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 9f9aa78cd74..24dbf8d1e98 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2090,7 +2090,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
active_runs=active_runs_of_dags.get(dag_model.dag_id),
)
continue
- if dag_model.next_dagrun is None:
+ if dag_model.next_dagrun is None and
dag_model.timetable_partitioned is False:
self.log.error(
"dag_model.next_dagrun is None; expected datetime",
dag_id=dag_model.dag_id,