kaxil commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3442034157
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2086,7 +2091,7 @@ def _resolve_partition_date(
return None
if not anchors:
- return None
+ return carried_partition_date
Review Comment:
When an APDR mixes an `IdentityMapper` (which carries a date) with a
temporal mapper on the same `target_key`, `carried_partition_date` is only
honored on this `not anchors` path. The moment any temporal mapper resolves an
anchor, `return anchors.pop()` (or the >1-anchor warning path) supersedes the
carried date, with no carry-vs-anchor comparison and no warning even when the
two disagree. `partition_mapper_config` is per-asset and a single APDR can be
fed by multiple assets, so a mixed identity+temporal feed is reachable. Is the
assumption that identity and temporal mappers never feed the same partitioned
consumer? If so, worth asserting/logging it here rather than relying on it
implicitly.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2027,6 +2028,39 @@ def test_partition_key_in_context(self,
create_runtime_ti, mock_supervisor_comms
context = runtime_ti.get_template_context()
assert context["partition_key"] == "some-partition"
+ def test_partition_date_in_context(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that partition_date from dag_run is exposed in the template
context."""
+ task = BaseOperator(task_id="hello")
+ runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+ dr = runtime_ti._ti_context_from_server.dag_run
+
+ mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+ data_interval_end=dr.logical_date - timedelta(hours=1),
+ data_interval_start=dr.logical_date - timedelta(hours=2),
+ start_date=dr.start_date - timedelta(hours=1),
+ end_date=dr.start_date,
+ )
+
+ context = runtime_ti.get_template_context()
+
+ # Default: partition_date is None
+ assert context["partition_date"] is None
+
+ # Set partition_date on dag_run and verify it surfaces in context
+ partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+ dr.partition_date = partition_date
+ context = runtime_ti.get_template_context()
+ assert context["partition_date"] == partition_date
+
+ # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts`
filters
+ # operate on a real awareness boundary.
+ from datetime import datetime as _datetime
Review Comment:
Nit: redundant inline import. `datetime` is already imported at the module
top (line 27), so this aliased `from datetime import datetime as _datetime` can
be dropped and `datetime(2026, 5, 20, 1, 0, 0)` used directly (it's naive just
the same). Matches the inline-import cleanups requested earlier in the review.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -695,6 +722,27 @@ def _get_or_create_apdr(
.limit(1)
)
if latest_apdr and latest_apdr.created_dag_run_id is None:
+ existing_partition_date = latest_apdr.partition_date
+ if (
+ existing_partition_date is not None
+ and target_partition_date is not None
+ and existing_partition_date != target_partition_date
Review Comment:
This conflict guard only ever clears, it never adopts or recovers. If the
APDR is first created with `partition_date=None` (a temporal/composite event,
or an identity event whose producer had no date) and a later identity event
carries `D`, the guard skips because `existing_partition_date is None`, so `D`
is silently dropped, and if no temporal mapper feeds this key there's nothing
to re-derive it from at run creation. Suppression is also sticky: once a
`D1`/`D2` conflict has cleared the date to `None`, a later pair of agreeing
events (`D2`/`D2`) never restores it. The new tests cover the `D1` vs `D2`
suppression but not the None-then-`D` adopt path or the recover-after-conflict
chain. Is the permanent-suppress behavior intentional?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]