kaxil commented on code in PR #67867:
URL: https://github.com/apache/airflow/pull/67867#discussion_r3391451294


##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -166,6 +166,7 @@ class TIRescheduleStatePayload(StrictBaseModel):
     ]
     reschedule_date: UtcDateTime
     end_date: UtcDateTime
+    rendered_map_index: str | None = None

Review Comment:
   The PR description lists an `AddRenderedMapIndexToReschedulePayload` 
migration in `v2026_06_30.py`, but I don't see it in the diff. New payload 
fields need a Cadwyn `VersionChange` so older API version schemas stay accurate 
-- `AddRenderedMapIndexField` in `v2025_04_28.py` is the precedent for exactly 
this field on the other payloads. Since 2026-06-30 isn't released yet, 
`schema(TIRescheduleStatePayload).field("rendered_map_index").didnt_exist` in 
that file should do it. (Static checks being green doesn't cover this btw: the 
`check-execution-api-versions` prek hook skips the schema diff when run with 
`--all-files`.)



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1513,7 +1513,9 @@ def _on_term(signum, frame):
         log.info("::group::Post Execute")
         log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
         msg = RescheduleTask(
-            reschedule_date=reschedule.reschedule_date, 
end_date=datetime.now(tz=timezone.utc)
+            reschedule_date=reschedule.reschedule_date,
+            end_date=datetime.now(tz=timezone.utc),
+            rendered_map_index=ti.rendered_map_index,

Review Comment:
   Worth stating in the description: on main this doesn't change what users 
see. #57208 (shipped in 3.2.0) already persists the label on this path: the 
`except Exception` block above re-renders and sends `SetRenderedMapIndex` 
(`AirflowRescheduleException` is an `Exception`), and the supervisor writes it 
to the DB before the reschedule call goes out. That's why the issue reproduces 
on your 3.1.2 build but not on main, as kevinhongzl found. Carrying the value 
in the reschedule payload itself is still reasonable for consistency with the 
other state payloads, but #67521 as reported only affects 3.0.x-3.1.x, so 
actually fixing affected users is a backport question rather than this merge.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -722,7 +722,10 @@ def _create_ti_state_update_query_and_update_state(
             query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         # clear the next_method and next_kwargs so that none of the retries 
pick them up
         updated_state = TaskInstanceState.UP_FOR_RESCHEDULE
-        query = query.values(state=updated_state, next_method=None, 
next_kwargs=None)
+        reschedule_values: dict[str, Any] = {"state": updated_state, 
"next_method": None, "next_kwargs": None}
+        if ti_patch_payload.rendered_map_index is not None:

Review Comment:
   Nothing exercises this write. Can you extend 
`test_ti_update_state_to_reschedule` in 
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py`
 to send `rendered_map_index` in the payload and assert it lands on the TI? The 
new SDK test only checks the message construction on the worker side.



##########
task-sdk/tests/task_sdk/bases/test_sensor.py:
##########
@@ -310,6 +310,26 @@ def test_ok_with_custom_reschedule_exception(self, 
make_sensor, run_task):
             state, _, _ = run_task(sensor)
         assert state == TaskInstanceState.SUCCESS
 
+    def test_reschedule_includes_rendered_map_index(self, run_task, 
make_sensor, time_machine):
+        """Test that RescheduleTask message includes rendered_map_index when 
map_index_template is set."""
+        sensor = make_sensor(
+            return_value=None,
+            poke_interval=10,
+            timeout=25,
+            mode="reschedule",
+            map_index_template="{{ task.task_id }}",
+        )
+        sensor.poke = Mock(return_value=False)
+
+        date1 = timezone.utcnow()
+        time_machine.move_to(date1, tick=False)

Review Comment:
   `date1` and the time freeze aren't used by any assertion. Either assert 
`msg.end_date == date1` (the freeze makes that deterministic) or drop these two 
lines and the `time_machine` fixture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to