This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b9fb473eb3eb1dffe76e07dd9e96ca72688cb1e5 Author: Niko <[email protected]> AuthorDate: Mon Jun 13 00:02:56 2022 -0700 DebugExecutor use ti.run() instead of ti._run_raw_task (#24357) The DebugExecutor previously executed tasks by calling the "private" ti._run_raw_task(...) method instead of ti.run(...). But the latter contains the logic to increase task instance try_numbers when running, thus tasks executed with the DebugExecutor were never getting their try_numbers increased and for rescheduled tasks this led to off-by-one errors (as the logic to reduce the try_number for the reschedule was still working while the increase was not). (cherry picked from commit da7b22be2986fc3217ac4d7fa6c3831d87ccff87) --- airflow/executors/debug_executor.py | 2 +- tests/dags/test_sensor.py | 33 +++++++++++++++++++ tests/executors/test_debug_executor.py | 4 +-- tests/jobs/test_backfill_job.py | 58 ++++++++++++++++++++++------------ 4 files changed, 73 insertions(+), 24 deletions(-) diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 865186dd18..6c8a13b345 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -76,7 +76,7 @@ class DebugExecutor(BaseExecutor): key = ti.key try: params = self.tasks_params.pop(ti.key, {}) - ti._run_raw_task(job_id=ti.job_id, **params) + ti.run(job_id=ti.job_id, **params) self.change_state(key, State.SUCCESS) ti._run_finished_callback() return True diff --git a/tests/dags/test_sensor.py b/tests/dags/test_sensor.py new file mode 100644 index 0000000000..760300ef00 --- /dev/null +++ b/tests/dags/test_sensor.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime + +from airflow import DAG +from airflow.decorators import task +from airflow.sensors.date_time import DateTimeSensor +from airflow.utils import timezone + +with DAG( + dag_id='test_sensor', start_date=datetime.datetime(2022, 1, 1), catchup=False, schedule_interval='@once' +) as dag: + + @task + def get_date(): + return str(timezone.utcnow() + datetime.timedelta(seconds=3)) + + DateTimeSensor(task_id='dts', target_time=str(get_date()), poke_interval=1, mode='reschedule') diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index dbce6aca40..371fbc5213 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -49,7 +49,7 @@ class TestDebugExecutor: succeeded = executor._run_task(task_instance_mock) assert succeeded - task_instance_mock._run_raw_task.assert_called_once_with(job_id=job_id) + task_instance_mock.run.assert_called_once_with(job_id=job_id) def test_queue_task_instance(self): key = "ti_key" @@ -100,7 +100,7 @@ class TestDebugExecutor: ti1 = MagicMock(key="t1") ti2 = MagicMock(key="t2") - ti1._run_raw_task.side_effect = Exception + ti1.run.side_effect = Exception executor.tasks_to_run = [ti1, ti2] diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index e5ab189f35..5222f9cdbe 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1309,10 +1309,18 @@ class TestBackfillJob: ti_status = BackfillJob._DagRunTaskStatus() - # test for success - ti.set_state(State.SUCCESS, session) - ti_status.running[ti.key] = ti - job._update_counters(ti_status=ti_status, session=session) + # Test for success + # The in-memory task key in ti_status.running contains a try_number + # that is always one behind the DB. The _update_counters method however uses + # a reduced_key to handle this. To test this, we mark the task as running in-memory + # and then increase the try number as it would be before the raw task is executed. + # When updating the counters the reduced_key will be used which will match what's + # in the in-memory ti_status.running map. This is the same for skipped, failed + # and retry states. + ti_status.running[ti.key] = ti # Task is queued and marked as running + ti._try_number += 1 # Try number is increased during ti.run() + ti.set_state(State.SUCCESS, session) # Task finishes with success state + job._update_counters(ti_status=ti_status, session=session) # Update counters assert len(ti_status.running) == 0 assert len(ti_status.succeeded) == 1 assert len(ti_status.skipped) == 0 @@ -1321,9 +1329,10 @@ class TestBackfillJob: ti_status.succeeded.clear() - # test for skipped - ti.set_state(State.SKIPPED, session) + # Test for skipped ti_status.running[ti.key] = ti + ti._try_number += 1 + ti.set_state(State.SKIPPED, session) job._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 assert len(ti_status.succeeded) == 0 @@ -1333,9 +1342,10 @@ class TestBackfillJob: ti_status.skipped.clear() - # test for failed - ti.set_state(State.FAILED, session) + # Test for failed ti_status.running[ti.key] = ti + ti._try_number += 1 + ti.set_state(State.FAILED, session) job._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 assert len(ti_status.succeeded) == 0 @@ -1345,9 +1355,10 @@ class TestBackfillJob: ti_status.failed.clear() - # test for retry - ti.set_state(State.UP_FOR_RETRY, session) + # Test for retry ti_status.running[ti.key] = ti + ti._try_number += 1 + ti.set_state(State.UP_FOR_RETRY, session) job._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 assert len(ti_status.succeeded) == 0 @@ -1357,13 +1368,18 @@ class TestBackfillJob: ti_status.to_run.clear() - # test for reschedule - # For rescheduled state, tests that reduced_key is not - # used by upping try_number. - ti._try_number = 2 - ti.set_state(State.UP_FOR_RESCHEDULE, session) - assert ti.try_number == 3 # see ti.try_number property in taskinstance module - ti_status.running[ti.key] = ti + # Test for reschedule + # Logic in taskinstance reduces the try number for a task that's been + # rescheduled (which makes sense because it's the _same_ try, but it's + # just being rescheduled to a later time). This now makes the in-memory + # and DB representation of the task try_number the _same_, which is unlike + # the above cases. But this is okay because the reduced_key is NOT used for + # the rescheduled case in _update_counters, for this exact reason. + ti_status.running[ti.key] = ti # Task queued and marked as running + # Note: Both the increase and decrease are kept here for context + ti._try_number += 1 # Try number is increased during ti.run() + ti._try_number -= 1 # Task is being rescheduled, decrement try_number + ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state job._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 assert len(ti_status.succeeded) == 0 @@ -1580,10 +1596,10 @@ class TestBackfillJob: @pytest.mark.long_running @pytest.mark.parametrize("executor_name", ["SequentialExecutor", "DebugExecutor"]) - @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"]) - def test_mapped_dag(self, dag_id, executor_name, session): + @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow", "test_sensor"]) + def test_backfilling_dags(self, dag_id, executor_name, session): """ - End-to-end test of a simple mapped dag. + End-to-end test for backfilling dags with various executors. We test with multiple executors as they have different "execution environments" -- for instance DebugExecutor runs a lot more in the same process than other Executors. @@ -1595,7 +1611,7 @@ class TestBackfillJob: self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py')) dag = self.dagbag.get_dag(dag_id) - when = datetime.datetime(2022, 1, 1) + when = timezone.datetime(2022, 1, 1) job = BackfillJob( dag=dag,
