Lee-W commented on code in PR #66848:
URL: https://github.com/apache/airflow/pull/66848#discussion_r3388560288
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1936,12 +1932,33 @@ def _resolve_asset_partition_status(
mapper = timetable.get_partition_mapper(name=name, uri=uri)
if not is_rollup(mapper):
return True
- return self._check_rollup_asset_status(
- asset_id=asset_id,
- apdr=apdr,
- mapper=mapper,
- actual_by_asset=actual_by_asset,
- )
+ if TYPE_CHECKING:
+ assert apdr.partition_key is not None
+ expected = mapper.to_upstream(apdr.partition_key)
+ actual = actual_by_asset.get(asset_id, set())
+
+ # A policy where the threshold can never be met given the rollup
+ # window's cardinality is permanently unreachable. Surface this as
a
+ # deduplicated warning so the operator can spot a stuck APDR in
+ # scheduler logs instead of seeing it silently never fire. Return
+ # ``False`` directly to skip the intersection that cannot succeed.
+ if mapper.wait_policy.is_unreachable(len(expected)):
+ unreachable_key = (apdr.target_dag_id, name, uri)
+ if unreachable_key not in self._partition_unreachable_seen:
+ self.log.warning(
+ "Wait policy %r is unreachable for asset (name=%r,
uri=%r) on Dag %r "
+ "given the window's cardinality %d; downstream Dag run
is permanently "
+ "unreachable.",
+ mapper.wait_policy,
+ name,
+ uri,
+ apdr.target_dag_id,
+ len(expected),
+ )
+ self._partition_unreachable_seen.add(unreachable_key)
+ return False
+
+ return mapper.wait_policy.is_satisfied_by_keys(matched=actual,
expected=expected)
Review Comment:
yep, most of the logic is now moved. `is_satisfied_by_keys` now returns a
`PartitionSatisfaction`, so the scheduler makes one call and branches on the
result instead of calling `is_unreachable` separately.
`_partition_unreachable_seen` stays in the scheduler and tracks the returned
`PartitionSatisfaction`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]