nathadfield commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3442125696
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2086,7 +2091,7 @@ def _resolve_partition_date(
return None
if not anchors:
- return None
+ return carried_partition_date
Review Comment:
@kaxil right, that's the intended contract: a partitioned consumer's feeding
assets are expected to agree on the partition datetime, and when a temporal
mapper resolves an anchor it takes precedence over the carried identity date
(the key is the authoritative source the scheduler can re-derive). I've added a
comment here making that explicit rather than implicit.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -695,6 +722,27 @@ def _get_or_create_apdr(
.limit(1)
)
if latest_apdr and latest_apdr.created_dag_run_id is None:
+ existing_partition_date = latest_apdr.partition_date
+ if (
+ existing_partition_date is not None
+ and target_partition_date is not None
+ and existing_partition_date != target_partition_date
Review Comment:
@kaxil good catch on the None-then-`D` drop, that was an unintended gap
rather than deliberate. I've changed it to adopt the incoming date when the
APDR carries none yet, so a later identity date is no longer dropped, and that
also makes suppression non-sticky (a post-conflict `None` is re-adoptable).
Genuine `D1`/`D2` conflicts still suppress to `None`. Added tests for the adopt
and recover-after-conflict paths.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2027,6 +2028,39 @@ def test_partition_key_in_context(self,
create_runtime_ti, mock_supervisor_comms
context = runtime_ti.get_template_context()
assert context["partition_key"] == "some-partition"
+ def test_partition_date_in_context(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that partition_date from dag_run is exposed in the template
context."""
+ task = BaseOperator(task_id="hello")
+ runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+ dr = runtime_ti._ti_context_from_server.dag_run
+
+ mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+ data_interval_end=dr.logical_date - timedelta(hours=1),
+ data_interval_start=dr.logical_date - timedelta(hours=2),
+ start_date=dr.start_date - timedelta(hours=1),
+ end_date=dr.start_date,
+ )
+
+ context = runtime_ti.get_template_context()
+
+ # Default: partition_date is None
+ assert context["partition_date"] is None
+
+ # Set partition_date on dag_run and verify it surfaces in context
+ partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+ dr.partition_date = partition_date
+ context = runtime_ti.get_template_context()
+ assert context["partition_date"] == partition_date
+
+ # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts`
filters
+ # operate on a real awareness boundary.
+ from datetime import datetime as _datetime
Review Comment:
@kaxil done, dropped the aliased inline import and used the module-level
`datetime`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]