uranusjr commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3194135356


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1827,40 +1845,165 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        if TYPE_CHECKING:
+            assert apdr.partition_key is not None
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            session.add(
+                Log(
+                    event="failed to evaluate rollup status",
+                    dag_id=apdr.target_dag_id,
+                    extra=(
+                        "Could not evaluate rollup status for partition_key "
+                        f"'{apdr.partition_key}' on asset (name='{name}', 
uri='{uri}') "
+                        f"in target Dag '{apdr.target_dag_id}'. This likely 
indicates "
+                        "that the rollup mapper is misconfigured or does not 
support "
+                        f"this partition key.\n{type(err).__name__}: {err}"
+                    ),
+                )
+            )
+            return False
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # Note: with strict FIFO ordering, persistently-unsatisfied APDRs at 
the head
+        # of the queue would block newer ones; switch to updated_at-based 
ordering if
+        # that becomes an issue.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)
+            .limit(MAX_PARTITION_DAG_RUNS_PER_TICK)
+        ).all()
+        if not pending_apdrs:
+            return set()
+
         partition_dag_ids: set[str] = set()
+        pending_apdr_ids = [apdr.id for apdr in pending_apdrs]
 
-        evaluator = AssetEvaluator(session)
-        for apdr in session.scalars(
-            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        # Pre-fetch all required serialized Dags in one query.
+        dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs if 
apdr.target_dag_id})
+        # {"dag_id": Serialized Dag}
+        serialized_dags: dict[str, SerializedDAG] = {}
+        for serdag in 
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session):
+            try:
+                serdag.load_op_links = False
+                serialized_dags[serdag.dag_id] = serdag.dag
+            except Exception:
+                self.log.exception("Failed to deserialize Dag '%s'", 
serdag.dag_id)
+
+        # {apdr_id: {asset_id: set(source_key, ...)}
+        source_key_by_asset_per_apdr: dict[int, dict[int, set[str]]] = 
defaultdict(lambda: defaultdict(set))
+        # {apdr_id: {asset_id: (asset_name, asset_uri)}
+        asset_info_per_apdr: dict[int, dict[int, tuple[str, str]]] = 
defaultdict(dict)
+        for apdr_id, asset_id, source_key, name, uri in session.execute(
+            select(
+                PartitionedAssetKeyLog.asset_partition_dag_run_id,
+                PartitionedAssetKeyLog.asset_id,
+                PartitionedAssetKeyLog.source_partition_key,
+                AssetModel.name,
+                AssetModel.uri,
+            )
+            .join(AssetModel, AssetModel.id == PartitionedAssetKeyLog.asset_id)
+            
.where(PartitionedAssetKeyLog.asset_partition_dag_run_id.in_(pending_apdr_ids))
         ):
+            source_key_by_asset_per_apdr[apdr_id][asset_id].add(source_key)
+            asset_info_per_apdr[apdr_id][asset_id] = (name, uri)
+
+        evaluator = AssetEvaluator(session)
+        for apdr in pending_apdrs:
             if TYPE_CHECKING:
                 assert apdr.target_dag_id
 
-            if not (dag := self._get_current_dag(dag_id=apdr.target_dag_id, 
session=session)):
+            if not (dag := serialized_dags.get(apdr.target_dag_id)):
                 self.log.error("Dag '%s' not found in serialized_dag table", 
apdr.target_dag_id)
                 continue
 
-            asset_models = session.scalars(
-                select(AssetModel).where(
-                    exists(
-                        select(1).where(
-                            PartitionedAssetKeyLog.asset_id == AssetModel.id,
-                            PartitionedAssetKeyLog.asset_partition_dag_run_id 
== apdr.id,
-                            PartitionedAssetKeyLog.target_partition_key == 
apdr.partition_key,
-                        )
+            source_key_by_asset = source_key_by_asset_per_apdr[apdr.id]
+            timetable = dag.timetable
+            statuses: dict[SerializedAssetUniqueKey, bool] = {}
+            for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items():
+                key = SerializedAssetUniqueKey(name=name, uri=uri)
+                if isinstance(timetable, PartitionedAssetTimetable):

Review Comment:
   I don’t like isinstance for this kind of logic branches in general. Can this 
be abstracted into a method or property on the timetable instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to