Lee-W commented on code in PR #59006:
URL: https://github.com/apache/airflow/pull/59006#discussion_r2591345657
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,53 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ partition_dag_ids: set[str] = set()
+ apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+ )
+ for apdr in apdrs:
+ partition_dag_ids.add(apdr.target_dag_id)
+ dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+ if not dag:
+ self.log.error("Dag '%s' not found in serialized_dag table",
apdr.target_dag_id)
+ continue
+
+ run_after = timezone.utcnow()
+ dag_run = dag.create_dagrun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=run_after
+ ),
+ logical_date=None,
+ data_interval=None,
+ partition_key=apdr.partition_key,
+ run_after=run_after,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ session.flush()
+ apdr.created_dag_run_id = dag_run.id
+ session.flush()
+
+ return partition_dag_ids
+
@retry_db_transaction
def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session:
Session) -> None:
"""Find Dag Models needing DagRuns and Create Dag Runs with retries in
case of OperationalError."""
+ partition_dag_ids: set[str] =
self._create_dagruns_for_partitioned_asset_dags(session)
+
query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
all_dags_needing_dag_runs = set(query.all())
- asset_triggered_dags = [
- dag for dag in all_dags_needing_dag_runs if dag.dag_id in
triggered_date_by_dag
- ]
+ asset_triggered_dags = [d for d in all_dags_needing_dag_runs if
d.dag_id in triggered_date_by_dag]
non_asset_dags =
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+ non_asset_dags = set(d for d in non_asset_dags if d.dag_id not in
partition_dag_ids)
Review Comment:
```suggestion
non_asset_dags = {
d
# filter asset-triggered Dags
for d in
all_dags_needing_dag_runs.difference(asset_triggered_dags)
# filter asset partition triggered Dags
if d.dag_id not in partition_dag_ids
}
```
I'm ok with the descriptive thing. But this might still be better. Do two
operations to make `non_asset_dags` real non asset Dags looks a bit weird to
me. But this is non-blocking
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -8140,3 +8137,61 @@ def test_mark_backfills_completed(dag_maker, session):
runner._mark_backfills_complete()
b = session.get(Backfill, b.id)
assert b.completed_at.timestamp() > 0
+
+
+def
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_populated(
+ dag_maker,
+ session,
+):
+ asset = Asset(name="hello")
+ with dag_maker(dag_id="asset_event_tester", schedule=None,
session=session) as dag:
+ EmptyOperator(task_id="hi", outlets=[asset])
+ dag1_id = dag.dag_id
+ dr = dag_maker.create_dagrun(partition_key="abc123", session=session)
+ assert dr.partition_key == "abc123"
+ [ti] = dr.get_task_instances(session=session)
+ session.commit()
+
+ with dag_maker(
+ dag_id="asset_event_listener",
+ schedule=PartitionedAssetTimetable(asset, IdentityMapper()),
Review Comment:
Not related to this PR, should we make `PartitionedAssetTimetable` the only
keyword? This generally makes it easier for us to extend it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]