This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f2127a32150d5e035906ab3776761b4384b643cd Author: Anthony Bush <[email protected]> AuthorDate: Sat Apr 29 17:56:14 2023 -0500 Fix backfill KeyError when try_number out of sync (#30653) Co-authored-by: Anthony Bush <[email protected]> (cherry picked from commit 17d1f3a7bbb0fc528e7e9f082d7ada3caddcf5e1) --- airflow/jobs/backfill_job_runner.py | 28 ++++++++++++++-------------- tests/jobs/test_backfill_job.py | 30 +++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 4a78890d3b..d28bbadff1 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -186,39 +186,39 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): refreshed_tis = [] TI = TaskInstance + ti_primary_key_to_ti_key = {ti_key.primary: ti_key for ti_key in ti_status.running.keys()} + filter_for_tis = TI.filter_for_tis(list(ti_status.running.values())) if filter_for_tis is not None: refreshed_tis = session.query(TI).filter(filter_for_tis).all() for ti in refreshed_tis: - # Here we remake the key by subtracting 1 to match in memory information - reduced_key = ti.key.reduced + # Use primary key to match in memory information + ti_key = ti_primary_key_to_ti_key[ti.key.primary] if ti.state == TaskInstanceState.SUCCESS: - ti_status.succeeded.add(reduced_key) + ti_status.succeeded.add(ti_key) self.log.debug("Task instance %s succeeded. Don't rerun.", ti) - ti_status.running.pop(reduced_key) + ti_status.running.pop(ti_key) continue if ti.state == TaskInstanceState.SKIPPED: - ti_status.skipped.add(reduced_key) + ti_status.skipped.add(ti_key) self.log.debug("Task instance %s skipped. Don't rerun.", ti) - ti_status.running.pop(reduced_key) + ti_status.running.pop(ti_key) continue if ti.state == TaskInstanceState.FAILED: self.log.error("Task instance %s failed", ti) - ti_status.failed.add(reduced_key) - ti_status.running.pop(reduced_key) + ti_status.failed.add(ti_key) + ti_status.running.pop(ti_key) continue # special case: if the task needs to run again put it back if ti.state == TaskInstanceState.UP_FOR_RETRY: self.log.warning("Task instance %s is up for retry", ti) - ti_status.running.pop(reduced_key) + ti_status.running.pop(ti_key) ti_status.to_run[ti.key] = ti # special case: if the task needs to be rescheduled put it back elif ti.state == TaskInstanceState.UP_FOR_RESCHEDULE: self.log.warning("Task instance %s is up for reschedule", ti) - # During handling of reschedule state in ti._handle_reschedule, try number is reduced - # by one, so we should not use reduced_key to avoid key error - ti_status.running.pop(ti.key) + ti_status.running.pop(ti_key) ti_status.to_run[ti.key] = ti # special case: The state of the task can be set to NONE by the task itself # when it reaches concurrency limits. It could also happen when the state @@ -232,13 +232,13 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): ti, ) tis_to_be_scheduled.append(ti) - ti_status.running.pop(reduced_key) + ti_status.running.pop(ti_key) ti_status.to_run[ti.key] = ti # special case: Deferrable task can go from DEFERRED to SCHEDULED; # when that happens, we need to put it back as in UP_FOR_RESCHEDULE elif ti.state == TaskInstanceState.SCHEDULED: self.log.debug("Task instance %s is resumed from deferred state", ti) - ti_status.running.pop(ti.key) + ti_status.running.pop(ti_key) ti_status.to_run[ti.key] = ti # Batch schedule of task instances diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 164d5af1df..37ed6f8a85 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1504,12 +1504,12 @@ class TestBackfillJob: # Test for success # The in-memory task key in ti_status.running contains a try_number - # that is always one behind the DB. The _update_counters method however uses - # a reduced_key to handle this. To test this, we mark the task as running in-memory - # and then increase the try number as it would be before the raw task is executed. - # When updating the counters the reduced_key will be used which will match what's - # in the in-memory ti_status.running map. This is the same for skipped, failed - # and retry states. + # that is not in sync with the DB. To test that _update_counters method + # handles this, we mark the task as running in-memory and then increase + # the try number as it would be before the raw task is executed. + # When updating the counters the in-memory key will be used which will + # match what's in the in-memory ti_status.running map. This is the same + # for skipped, failed and retry states. ti_status.running[ti.key] = ti # Task is queued and marked as running ti._try_number += 1 # Try number is increased during ti.run() ti.set_state(State.SUCCESS, session) # Task finishes with success state @@ -1522,6 +1522,19 @@ class TestBackfillJob: ti_status.succeeded.clear() + # Test for success when DB try_number is off from in-memory expectations + ti_status.running[ti.key] = ti + ti._try_number += 2 + ti.set_state(State.SUCCESS, session) + job_runner._update_counters(ti_status=ti_status, session=session) + assert len(ti_status.running) == 0 + assert len(ti_status.succeeded) == 1 + assert len(ti_status.skipped) == 0 + assert len(ti_status.failed) == 0 + assert len(ti_status.to_run) == 0 + + ti_status.succeeded.clear() + # Test for skipped ti_status.running[ti.key] = ti ti._try_number += 1 @@ -1566,8 +1579,7 @@ class TestBackfillJob: # rescheduled (which makes sense because it's the _same_ try, but it's # just being rescheduled to a later time). This now makes the in-memory # and DB representation of the task try_number the _same_, which is unlike - # the above cases. But this is okay because the reduced_key is NOT used for - # the rescheduled case in _update_counters, for this exact reason. + # the above cases. But this is okay because the in-memory key is used. ti_status.running[ti.key] = ti # Task queued and marked as running # Note: Both the increase and decrease are kept here for context ti._try_number += 1 # Try number is increased during ti.run() @@ -1585,7 +1597,7 @@ class TestBackfillJob: # test for none ti.set_state(State.NONE, session) # Setting ti._try_number = 0 brings us to ti.try_number==1 - # so that the reduced_key access will work fine + # so that the in-memory key access will work fine ti._try_number = 0 assert ti.try_number == 1 # see ti.try_number property in taskinstance module session.merge(ti)
