This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f2127a32150d5e035906ab3776761b4384b643cd
Author: Anthony Bush <[email protected]>
AuthorDate: Sat Apr 29 17:56:14 2023 -0500

    Fix backfill KeyError when try_number out of sync (#30653)
    
    Co-authored-by: Anthony Bush <[email protected]>
    (cherry picked from commit 17d1f3a7bbb0fc528e7e9f082d7ada3caddcf5e1)
---
 airflow/jobs/backfill_job_runner.py | 28 ++++++++++++++--------------
 tests/jobs/test_backfill_job.py     | 30 +++++++++++++++++++++---------
 2 files changed, 35 insertions(+), 23 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 4a78890d3b..d28bbadff1 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -186,39 +186,39 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
         refreshed_tis = []
         TI = TaskInstance
 
+        ti_primary_key_to_ti_key = {ti_key.primary: ti_key for ti_key in 
ti_status.running.keys()}
+
         filter_for_tis = TI.filter_for_tis(list(ti_status.running.values()))
         if filter_for_tis is not None:
             refreshed_tis = session.query(TI).filter(filter_for_tis).all()
 
         for ti in refreshed_tis:
-            # Here we remake the key by subtracting 1 to match in memory 
information
-            reduced_key = ti.key.reduced
+            # Use primary key to match in memory information
+            ti_key = ti_primary_key_to_ti_key[ti.key.primary]
             if ti.state == TaskInstanceState.SUCCESS:
-                ti_status.succeeded.add(reduced_key)
+                ti_status.succeeded.add(ti_key)
                 self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
-                ti_status.running.pop(reduced_key)
+                ti_status.running.pop(ti_key)
                 continue
             if ti.state == TaskInstanceState.SKIPPED:
-                ti_status.skipped.add(reduced_key)
+                ti_status.skipped.add(ti_key)
                 self.log.debug("Task instance %s skipped. Don't rerun.", ti)
-                ti_status.running.pop(reduced_key)
+                ti_status.running.pop(ti_key)
                 continue
             if ti.state == TaskInstanceState.FAILED:
                 self.log.error("Task instance %s failed", ti)
-                ti_status.failed.add(reduced_key)
-                ti_status.running.pop(reduced_key)
+                ti_status.failed.add(ti_key)
+                ti_status.running.pop(ti_key)
                 continue
             # special case: if the task needs to run again put it back
             if ti.state == TaskInstanceState.UP_FOR_RETRY:
                 self.log.warning("Task instance %s is up for retry", ti)
-                ti_status.running.pop(reduced_key)
+                ti_status.running.pop(ti_key)
                 ti_status.to_run[ti.key] = ti
             # special case: if the task needs to be rescheduled put it back
             elif ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:
                 self.log.warning("Task instance %s is up for reschedule", ti)
-                # During handling of reschedule state in 
ti._handle_reschedule, try number is reduced
-                # by one, so we should not use reduced_key to avoid key error
-                ti_status.running.pop(ti.key)
+                ti_status.running.pop(ti_key)
                 ti_status.to_run[ti.key] = ti
             # special case: The state of the task can be set to NONE by the 
task itself
             # when it reaches concurrency limits. It could also happen when 
the state
@@ -232,13 +232,13 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                     ti,
                 )
                 tis_to_be_scheduled.append(ti)
-                ti_status.running.pop(reduced_key)
+                ti_status.running.pop(ti_key)
                 ti_status.to_run[ti.key] = ti
             # special case: Deferrable task can go from DEFERRED to SCHEDULED;
             # when that happens, we need to put it back as in UP_FOR_RESCHEDULE
             elif ti.state == TaskInstanceState.SCHEDULED:
                 self.log.debug("Task instance %s is resumed from deferred 
state", ti)
-                ti_status.running.pop(ti.key)
+                ti_status.running.pop(ti_key)
                 ti_status.to_run[ti.key] = ti
 
         # Batch schedule of task instances
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 164d5af1df..37ed6f8a85 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1504,12 +1504,12 @@ class TestBackfillJob:
 
         # Test for success
         # The in-memory task key in ti_status.running contains a try_number
-        # that is always one behind the DB. The _update_counters method 
however uses
-        # a reduced_key to handle this. To test this, we mark the task as 
running in-memory
-        # and then increase the try number as it would be before the raw task 
is executed.
-        # When updating the counters the reduced_key will be used which will 
match what's
-        # in the in-memory ti_status.running map. This is the same for 
skipped, failed
-        # and retry states.
+        # that is not in sync with the DB. To test that _update_counters method
+        # handles this, we mark the task as running in-memory and then increase
+        # the try number as it would be before the raw task is executed.
+        # When updating the counters the in-memory key will be used which will
+        # match what's in the in-memory ti_status.running map. This is the same
+        # for skipped, failed and retry states.
         ti_status.running[ti.key] = ti  # Task is queued and marked as running
         ti._try_number += 1  # Try number is increased during ti.run()
         ti.set_state(State.SUCCESS, session)  # Task finishes with success 
state
@@ -1522,6 +1522,19 @@ class TestBackfillJob:
 
         ti_status.succeeded.clear()
 
+        # Test for success when DB try_number is off from in-memory 
expectations
+        ti_status.running[ti.key] = ti
+        ti._try_number += 2
+        ti.set_state(State.SUCCESS, session)
+        job_runner._update_counters(ti_status=ti_status, session=session)
+        assert len(ti_status.running) == 0
+        assert len(ti_status.succeeded) == 1
+        assert len(ti_status.skipped) == 0
+        assert len(ti_status.failed) == 0
+        assert len(ti_status.to_run) == 0
+
+        ti_status.succeeded.clear()
+
         # Test for skipped
         ti_status.running[ti.key] = ti
         ti._try_number += 1
@@ -1566,8 +1579,7 @@ class TestBackfillJob:
         # rescheduled (which makes sense because it's the _same_ try, but it's
         # just being rescheduled to a later time). This now makes the in-memory
         # and DB representation of the task try_number the _same_, which is 
unlike
-        # the above cases. But this is okay because the reduced_key is NOT 
used for
-        # the rescheduled case in _update_counters, for this exact reason.
+        # the above cases. But this is okay because the in-memory key is used.
         ti_status.running[ti.key] = ti  # Task queued and marked as running
         # Note: Both the increase and decrease are kept here for context
         ti._try_number += 1  # Try number is increased during ti.run()
@@ -1585,7 +1597,7 @@ class TestBackfillJob:
         # test for none
         ti.set_state(State.NONE, session)
         # Setting ti._try_number = 0 brings us to ti.try_number==1
-        # so that the reduced_key access will work fine
+        # so that the in-memory key access will work fine
         ti._try_number = 0
         assert ti.try_number == 1  # see ti.try_number property in 
taskinstance module
         session.merge(ti)

Reply via email to