dstandish commented on code in PR #59006:
URL: https://github.com/apache/airflow/pull/59006#discussion_r2589464933


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1673,19 +1674,55 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _create_dagruns_for_partitioned_asset_dags(self, session: Session):
+        asset_partition_dags: set[str] = set()
+        apdrs: Iterable[AssetPartitionDagRun] = session.scalars(
+            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        )
+        for apdr in apdrs:
+            asset_partition_dags.add(apdr.target_dag_id)
+            dag = _get_current_dag(dag_id=apdr.target_dag_id, session=session)
+            if not dag:
+                self.log.error("DAG '%s' not found in serialized_dag table", 
apdr.target_dag_id)
+                continue
+
+            run_after = timezone.utcnow()
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
+                ),
+                logical_date=None,
+                data_interval=None,
+                partition_key=apdr.partition_key,
+                run_after=run_after,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            session.flush()
+            apdr.created_dag_run_id = dag_run.id
+            session.flush()
+
+        return asset_partition_dags
+
     @retry_db_transaction
     def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: 
Session) -> None:
         """Find Dag Models needing DagRuns and Create Dag Runs with retries in 
case of OperationalError."""
+        asset_partition_dags: set[str] = 
self._create_dagruns_for_partitioned_asset_dags(session)
+
         query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
         all_dags_needing_dag_runs = set(query.all())
         asset_triggered_dags = [
             dag for dag in all_dags_needing_dag_runs if dag.dag_id in 
triggered_date_by_dag
         ]
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
+        non_asset_dags = set(x for x in non_asset_dags if x.dag_id not in 
asset_partition_dags)

Review Comment:
   I think that using descriptive variable names in list comprehensions is 
actually counterproductive and goes against readability because of the noise it 
generates
   
   with this
   
   ```
   set(x for x in non_asset_dags if x.dag_id not in asset_partition_dags)
   ```
   
   if you want to know what `x` is, you just look at what 
`asset_partition_dags` is.
   
   some prefer using a more associated letter such as `d` for dag but i don't 
really mind x personally since it's just a placeholder for "thing in 
asset_partition_dags" and people understand that.
   
   but yeah this has come up before and i think the general consensus was on 
the side of "don't use descriptive variable names here".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to