Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3322125572


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1882,233 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                # The audit Log row is committed on its own session so it
+                # survives even when the outer ``_create_dagruns_for_dags``
+                # transaction is rolled back (the caller is wrapped in
+                # ``@retry_db_transaction``, and a downstream 
``OperationalError``
+                # or scheduler crash mid-tick would otherwise drop the row 
while
+                # the in-memory set still suppressed the next attempt).
+                # The dedup-set update is gated on the independent commit
+                # succeeding; a transient DB failure on the audit session 
leaves
+                # the key unmarked so the next tick retries.
+                if self._record_partition_audit_log(apdr=apdr, name=name, 
uri=uri, err=err):
+                    self._partition_audit_seen.add(audit_key)
+            return False
+
+    def _record_partition_audit_log(
+        self,
+        *,
+        apdr: AssetPartitionDagRun,
+        name: str,
+        uri: str,
+        err: BaseException,
+    ) -> bool:
+        """
+        Persist a misconfigured-rollup audit Log row on an independent session.
+
+        Returns ``True`` on commit success, ``False`` when the write fails.
+        Failures are logged and swallowed: the audit row is advisory (the
+        warning above already captures the same information for operators
+        reading scheduler logs), and propagating a Log-table failure would
+        taint the scheduler tick.
+
+        APDR attributes are captured into locals before opening the audit
+        session so the new session never has to lazy-load through an instance
+        attached to the outer session.
+        """
+        target_dag_id = apdr.target_dag_id
+        extra = (
+            "Could not evaluate rollup status for partition_key "
+            f"'{apdr.partition_key}' on asset (name='{name}', uri='{uri}') "
+            f"in target Dag '{target_dag_id}'. This likely indicates "
+            "that the rollup mapper is misconfigured or does not support "
+            f"this partition key.\n{type(err).__name__}: {err}"
+        )
+        try:
+            # ``scoped=False`` so this really is a separate connection / 
session
+            # rather than the thread-scoped one shared with the outer
+            # transaction — otherwise an inner commit would close the outer
+            # session's state too.
+            with create_session(scoped=False) as audit_session:
+                audit_session.add(
+                    Log(
+                        event="failed to evaluate rollup status",
+                        dag_id=target_dag_id,
+                        extra=extra,
+                    )
+                )
+        except Exception:
+            self.log.warning(
+                "Failed to write audit Log row for misconfigured rollup 
mapper",
+                dag_id=target_dag_id,
+                asset_name=name,
+                asset_uri=uri,
+                exc_info=True,
+            )
+            return False
+        return True
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+
+        Asset deactivation freezes pending APDRs: when an asset becomes 
inactive
+        (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog`` 
rows
+        stop contributing to the rollup. If the consumer Dag still depends on 
that
+        asset, firing on stale history would conflict with the declared 
topology,
+        so the APDR waits. Reactivating the asset resumes evaluation 
automatically.
+        This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # FIFO is intentional: the oldest pending APDR fires first. A 
persistently
+        # unsatisfiable APDR at the head (e.g. broken mapper, upstream that 
will
+        # never arrive) blocks newer ones until an operator removes it or fixes
+        # the underlying mapper. We surface the stuck state rather than 
silently
+        # rotating past it.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)
+            .limit(self._max_partition_dag_runs_per_loop)
+        ).all()
+        if not pending_apdrs:
+            return set()
+
         partition_dag_ids: set[str] = set()
+        pending_apdr_ids = [apdr.id for apdr in pending_apdrs]
 
-        evaluator = AssetEvaluator(session)
-        for apdr in session.scalars(
-            
select(AssetPartitionDagRun).where(AssetPartitionDagRun.created_dag_run_id.is_(None))
+        # Pre-fetch all required serialized Dags in one query.
+        dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs})
+        # {"dag_id": Serialized Dag}
+        serialized_dags: dict[str, SerializedDAG] = {}
+        for serdag in 
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session):

Review Comment:
   APDR now carries a `dag_version_id` stamped at creation from 
`serdag.dag_version_id`, and the scheduler tick discards APDRs whose stamp no 
longer matches the latest serialized Dag version (along with their 
PartitionedAssetKeyLog rows) before evaluation — so a mid-window mapper / 
window swap can't fire on partial data or hold forever. An audit Log entry is 
written per cleared Dag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to