Lee-W commented on code in PR #59006:
URL: https://github.com/apache/airflow/pull/59006#discussion_r2587350097


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,55 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _create_dagruns_for_partitioned_asset_dags(self, session: Session):
+        asset_partition_dags: set[str] = set()
+        apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        )
+        for apdr in apdrs:
+            asset_partition_dags.add(apdr.target_dag_id)
+            dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+            if not dag:
+                self.log.error("DAG '%s' not found in serialized_dag table", 
apdr.target_dag_id)
+                continue
+
+            run_after = timezone.utcnow()
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
+                ),
+                logical_date=None,
+                data_interval=None,
+                partition_key=apdr.partition_key,
+                run_after=run_after,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            session.flush()
+            apdr.created_dag_run_id = dag_run.id
+            session.flush()
+
+        return asset_partition_dags
+
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: 
Session) -> None:
         """Find Dag Models needing DagRuns and Create Dag Runs with retries in 
case of OperationalError."""
+        asset_partition_dags: set[str] = 
self._create_dagruns_for_partitioned_asset_dags(session)
+
         query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
         all_dags_needing_dag_runs = set(query.all())
         asset_triggered_dags = [
             dag for dag in all_dags_needing_dag_runs if dag.dag_id in 
triggered_date_by_dag
         ]
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+        non_asset_dags = set(x for x in non_asset_dags if x.dag_id not in 
asset_partition_dags)
         self._create_dag_runs(non_asset_dags, session)
         if asset_triggered_dags:
             self._create_dag_runs_asset_triggered(
-                dag_models=asset_triggered_dags,
+                dag_models=[x for x in asset_triggered_dags if x.dag_id not in 
asset_partition_dags],

Review Comment:
   ```suggestion
                   dag_models=[dag for dag in asset_triggered_dags if 
dag.dag_id not in asset_partition_dags],
   ```
   
   should we filter it 
https://github.com/apache/airflow/pull/59006/files#diff-072241d81275cd4b4b867f51025e9dca800610069305cffeb4b77ad45e135557R1717
 instead?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,55 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _create_dagruns_for_partitioned_asset_dags(self, session: Session):
+        asset_partition_dags: set[str] = set()
+        apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        )
+        for apdr in apdrs:
+            asset_partition_dags.add(apdr.target_dag_id)
+            dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+            if not dag:
+                self.log.error("DAG '%s' not found in serialized_dag table", 
apdr.target_dag_id)
+                continue
+
+            run_after = timezone.utcnow()
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
+                ),
+                logical_date=None,
+                data_interval=None,
+                partition_key=apdr.partition_key,
+                run_after=run_after,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            session.flush()
+            apdr.created_dag_run_id = dag_run.id
+            session.flush()
+
+        return asset_partition_dags
+
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: 
Session) -> None:
         """Find Dag Models needing DagRuns and Create Dag Runs with retries in 
case of OperationalError."""
+        asset_partition_dags: set[str] = 
self._create_dagruns_for_partitioned_asset_dags(session)
+
         query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
         all_dags_needing_dag_runs = set(query.all())
         asset_triggered_dags = [
             dag for dag in all_dags_needing_dag_runs if dag.dag_id in 
triggered_date_by_dag
         ]
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+        non_asset_dags = set(x for x in non_asset_dags if x.dag_id not in 
asset_partition_dags)

Review Comment:
   ```suggestion
           non_asset_dags = {
               dag
               # filter asset triggered Dags
               for dag in 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
               # filter asset partition triggered Dag
               if dag.dag_id not in asset_partition_dags
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to