uranusjr commented on code in PR #66848:
URL: https://github.com/apache/airflow/pull/66848#discussion_r3378468373


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1936,11 +1942,45 @@ def _resolve_asset_partition_status(
             mapper = timetable.get_partition_mapper(name=name, uri=uri)
             if not is_rollup(mapper):
                 return True
+            if TYPE_CHECKING:
+                assert apdr.partition_key is not None
+            expected = mapper.to_upstream(apdr.partition_key)
+            actual = actual_by_asset.get(asset_id, set())
+
+            # ``WaitForAll`` fast path: ``set.issubset`` short-circuits on the
+            # first missing key, so a typical backfill / catch-up tick with 
even
+            # one upstream gap returns without materializing the full 
intersection.
+            # The general dispatcher below pays O(|expected|) per call to stay 
a
+            # pure function of counts; this branch keeps the common case 
bounded
+            # by the *first* missing key rather than window cardinality.
+            if isinstance(mapper.wait_policy, WaitForAll):
+                return expected.issubset(actual)
+
+            # A policy where the threshold can never be met given the rollup
+            # window's cardinality is permanently unreachable. Surface this as 
a
+            # deduplicated warning so the operator can spot a stuck APDR in
+            # scheduler logs instead of seeing it silently never fire. Return
+            # ``False`` directly to skip the intersection that cannot succeed.
+            if mapper.wait_policy.is_unreachable(len(expected)):
+                unreachable_key = (apdr.target_dag_id, name, uri)
+                if unreachable_key not in self._partition_unreachable_seen:
+                    self.log.warning(
+                        "Wait policy %r is unreachable for asset (name=%r, 
uri=%r) on Dag %r "
+                        "given the window's cardinality %d; downstream Dag run 
is permanently "
+                        "unreachable.",
+                        mapper.wait_policy,
+                        name,
+                        uri,
+                        apdr.target_dag_id,
+                        len(expected),
+                    )
+                    self._partition_unreachable_seen.add(unreachable_key)
+                return False
+

Review Comment:
   It is likely better to put this block into a function on the wait_policy 
class. (And the isinstance block be handled with overriding)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to