uranusjr commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3339816384
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1871,225 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied); the exception is logged at ``ERROR`` level in the
+ scheduler log so operators can diagnose the misconfiguration.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
Review Comment:
This can use a `TypeGuard` helper to avoid `cast` (see how `is_mapped` is
used)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]