Lee-W commented on code in PR #59006:
URL: https://github.com/apache/airflow/pull/59006#discussion_r2591345657


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,53 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        partition_dag_ids: set[str] = set()
+        apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        )
+        for apdr in apdrs:
+            partition_dag_ids.add(apdr.target_dag_id)
+            dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+            if not dag:
+                self.log.error("Dag '%s' not found in serialized_dag table", 
apdr.target_dag_id)
+                continue
+
+            run_after = timezone.utcnow()
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
+                ),
+                logical_date=None,
+                data_interval=None,
+                partition_key=apdr.partition_key,
+                run_after=run_after,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            session.flush()
+            apdr.created_dag_run_id = dag_run.id
+            session.flush()
+
+        return partition_dag_ids
+
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: 
Session) -> None:
         """Find Dag Models needing DagRuns and Create Dag Runs with retries in 
case of OperationalError."""
+        partition_dag_ids: set[str] = 
self._create_dagruns_for_partitioned_asset_dags(session)
+
         query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
         all_dags_needing_dag_runs = set(query.all())
-        asset_triggered_dags = [
-            dag for dag in all_dags_needing_dag_runs if dag.dag_id in 
triggered_date_by_dag
-        ]
+        asset_triggered_dags = [d for d in all_dags_needing_dag_runs if 
d.dag_id in triggered_date_by_dag]
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+        non_asset_dags = set(d for d in non_asset_dags if d.dag_id not in 
partition_dag_ids)

Review Comment:
   ```suggestion
           non_asset_dags = {
               d
               # filter asset-triggered Dags
               for d in 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
               # filter asset partition triggered Dags
               if d.dag_id not in partition_dag_ids
           }
   ```
   
   I'm ok with the descriptive thing. But this might still be better. Do two 
operations to make `non_asset_dags` real non asset Dags looks a bit weird to 
me. But this is non-blocking



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -8140,3 +8137,61 @@ def test_mark_backfills_completed(dag_maker, session):
     runner._mark_backfills_complete()
     b = session.get(Backfill, b.id)
     assert b.completed_at.timestamp() > 0
+
+
+def 
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_populated(
+    dag_maker,
+    session,
+):
+    asset = Asset(name="hello")
+    with dag_maker(dag_id="asset_event_tester", schedule=None, 
session=session) as dag:
+        EmptyOperator(task_id="hi", outlets=[asset])
+    dag1_id = dag.dag_id
+    dr = dag_maker.create_dagrun(partition_key="abc123", session=session)
+    assert dr.partition_key == "abc123"
+    [ti] = dr.get_task_instances(session=session)
+    session.commit()
+
+    with dag_maker(
+        dag_id="asset_event_listener",
+        schedule=PartitionedAssetTimetable(asset, IdentityMapper()),

Review Comment:
   Not related to this PR, should we make `PartitionedAssetTimetable` the only 
keyword? This generally makes it easier for us to extend it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to