Lee-W commented on code in PR #59006:
URL: https://github.com/apache/airflow/pull/59006#discussion_r2587314419


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,55 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _create_dagruns_for_partitioned_asset_dags(self, session: Session):
+        asset_partition_dags: set[str] = set()
+        apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        )
+        for apdr in apdrs:
+            asset_partition_dags.add(apdr.target_dag_id)
+            dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+            if not dag:
+                self.log.error("DAG '%s' not found in serialized_dag table", 
apdr.target_dag_id)
+                continue
+
+            run_after = timezone.utcnow()
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
+                ),
+                logical_date=None,
+                data_interval=None,
+                partition_key=apdr.partition_key,
+                run_after=run_after,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            session.flush()
+            apdr.created_dag_run_id = dag_run.id
+            session.flush()
+
+        return asset_partition_dags
+
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: 
Session) -> None:
         """Find Dag Models needing DagRuns and Create Dag Runs with retries in 
case of OperationalError."""
+        asset_partition_dags: set[str] = 
self._create_dagruns_for_partitioned_asset_dags(session)
+
         query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
         all_dags_needing_dag_runs = set(query.all())
         asset_triggered_dags = [
             dag for dag in all_dags_needing_dag_runs if dag.dag_id in 
triggered_date_by_dag
         ]
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+        non_asset_dags = set(x for x in non_asset_dags if x.dag_id not in 
asset_partition_dags)

Review Comment:
   ```suggestion
           non_asset_dag_ids = set(x for x in non_asset_dags if x.dag_id not in 
asset_partition_dags)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to