This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0656f10339875e2920611ffe4ecdf49093c2d2ab Author: Jorrick Sleijster <[email protected]> AuthorDate: Fri Jul 23 00:25:08 2021 +0200 Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179) **Problem discovery:** I was debugging a bug with the `external_executor_id` Airflow after which this UI bug caught my eye and I got annoyed by it. I figured to fix this one first so my other testing can go a bit smoother :) **Description of the problem:** Currently there is a BUG inside the Task Instance details (/task) view. It loads the TaskInstance by calling `TI(task, execution_date)` and then uses `refresh_from_db()` to refresh many fields that are no filled in yet. However, the assumption is made in that case that it refreshes all values, which it does not. `external_executor_id` and `queued_by_job_id` are not updated at all and `executor_config` is only instantiated by the original `TI(task, execution_date)` call but also not updated in `refresh_from_db()`. This also shows in the UI where these values are always showing None, while the TaskInstance view shows you these values are not None. **The changes in the PR:** 1. Changes to the `update_from_db()` method to include the missing three values. 2. A new test that checks we are really updating ALL values in `update_from_db()` 3. Removal of an incorrect comment as we do need the `execution_date` for that view. (cherry picked from commit 759c76d7a5d23cc6f6ef4f724a1a322d2445bbd2) --- airflow/models/taskinstance.py | 3 +++ airflow/www/views.py | 2 -- tests/models/test_taskinstance.py | 54 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index aeb2a22..38d7e22 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -643,7 +643,10 @@ class TaskInstance(Base, LoggingMixin): self.priority_weight = ti.priority_weight self.operator = ti.operator self.queued_dttm = ti.queued_dttm + self.queued_by_job_id = ti.queued_by_job_id self.pid = ti.pid + self.executor_config = ti.executor_config + self.external_executor_id = ti.external_executor_id else: self.state = None diff --git a/airflow/www/views.py b/airflow/www/views.py index f77e66c..e519ebf 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1231,8 +1231,6 @@ class Airflow(AirflowBaseView): """Retrieve task.""" dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') - # Carrying execution_date through, even though it's irrelevant for - # this context execution_date = request.args.get('execution_date') dttm = timezone.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index b6cd8b8..0b063f9 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2026,6 +2026,60 @@ class TestTaskInstance(unittest.TestCase): assert ti.start_date < ti.end_date assert ti.duration > 0 + def test_refresh_from_db(self): + run_date = timezone.utcnow() + + expected_values = { + "task_id": "test_refresh_from_db_task", + "dag_id": "test_refresh_from_db_dag", + "execution_date": run_date, + "start_date": run_date + datetime.timedelta(days=1), + "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234), + "duration": 1.234, + "state": State.SUCCESS, + "_try_number": 1, + "max_tries": 1, + "hostname": "some_unique_hostname", + "unixname": "some_unique_unixname", + "job_id": 1234, + "pool": "some_fake_pool_id", + "pool_slots": 25, + "queue": "some_queue_id", + "priority_weight": 123, + "operator": "some_custom_operator", + "queued_dttm": run_date + datetime.timedelta(hours=1), + "queued_by_job_id": 321, + "pid": 123, + "executor_config": {"Some": {"extra": "information"}}, + "external_executor_id": "some_executor_id", + } + # Make sure we aren't missing any new value in our expected_values list. + expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values.keys()} + assert {str(c) for c in TI.__table__.columns} == expected_keys, ( + "Please add all non-foreign values of TaskInstance to this list. " + "This prevents refresh_from_db() from missing a field." + ) + + operator = DummyOperator(task_id=expected_values['task_id']) + ti = TI(task=operator, execution_date=expected_values['execution_date']) + for key, expected_value in expected_values.items(): + setattr(ti, key, expected_value) + with create_session() as session: + session.merge(ti) + session.commit() + + mock_task = mock.MagicMock() + mock_task.task_id = expected_values["task_id"] + mock_task.dag_id = expected_values["dag_id"] + + ti = TI(task=mock_task, execution_date=run_date) + ti.refresh_from_db() + for key, expected_value in expected_values.items(): + assert hasattr(ti, key), f"Key {key} is missing in the TaskInstance." + assert ( + getattr(ti, key) == expected_value + ), f"Key: {key} had different values. Make sure it loads it in the refresh refresh_from_db()" + @pytest.mark.parametrize("pool_override", [None, "test_pool2"]) def test_refresh_from_task(pool_override):
