This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fd7052f863 Move TaskInstance heartbeat directly on to TI row, not on 
Job row (#43599)
6fd7052f863 is described below

commit 6fd7052f863ad9fc95ea4b82f8993fc5858d0dc3
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Sun Nov 3 01:04:51 2024 +0000

    Move TaskInstance heartbeat directly on to TI row, not on Job row (#43599)
    
    This is part of the work for AIP-72 epic, but is done as a separate PR for
    ease of review.
    
    This PR by itself doesn't remove the LocalTaskJob row (that will happen in a
    future PR when the execution code is moved over to live in the TaskSDK) but
    this paves the way for it. The reason we are making this change is:
    
    - Having a separate row for tracking TI heartbeat is not really buying us 
much
    - With the addition of TaskInstanceHistory we don't need _another_ separate
      record of when/where TIs were run
    - It simplifies things (one less join in finding zombies)
    - Makes zombie tracking easier -- it is now just on the TI state, not the
      combination of TI and Job state.
---
 airflow/cli/cli_config.py                          |   2 -
 airflow/cli/commands/task_command.py               |   1 -
 airflow/executors/debug_executor.py                |   2 +-
 airflow/jobs/local_task_job_runner.py              |   3 +-
 airflow/jobs/scheduler_job_runner.py               |  18 +-
 ...5_3_0_0_add_last_heartbeat_at_directly_to_ti.py |  60 ++++
 airflow/models/taskinstance.py                     |  54 ++--
 airflow/models/taskinstancehistory.py              |   1 -
 airflow/serialization/pydantic/taskinstance.py     |  16 +-
 airflow/task/standard_task_runner.py               |  23 +-
 airflow/utils/db.py                                |   2 +-
 airflow/www/views.py                               |   4 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            | 310 ++++++++++-----------
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 .../ci/pre_commit/check_ti_vs_tis_attributes.py    |   2 +
 .../test_mapped_task_instance_endpoint.py          |   1 -
 .../endpoints/test_task_instance_endpoint.py       |   1 -
 .../core_api/routes/public/test_task_instances.py  |   1 -
 tests/assets/test_manager.py                       |   1 +
 tests/cli/commands/test_task_command.py            |   2 +-
 tests/executors/test_debug_executor.py             |   2 +-
 tests/jobs/test_local_task_job.py                  |  18 +-
 tests/jobs/test_scheduler_job.py                   |  28 +-
 tests/models/test_taskinstance.py                  |   2 +-
 tests/www/views/test_views_tasks.py                |  14 +-
 26 files changed, 294 insertions(+), 280 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 15543023cbf..06ac2f7bd81 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -580,7 +580,6 @@ ARG_SHIP_DAG = Arg(
     ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the 
worker", action="store_true"
 )
 ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the 
entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
 ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead 
of airflow.cfg")
 ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task 
index")
 ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of 
dag file", action="store_true")
@@ -1354,7 +1353,6 @@ TASKS_COMMANDS = (
             ARG_DEPENDS_ON_PAST,
             ARG_SHIP_DAG,
             ARG_PICKLE,
-            ARG_JOB_ID,
             ARG_INTERACTIVE,
             ARG_SHUT_DOWN_LOGGING,
             ARG_MAP_INDEX,
diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index 23f6e1abbe5..03d2737072f 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -341,7 +341,6 @@ def _run_raw_task(args, ti: TaskInstance) -> None | 
TaskReturnCode:
     """Run the main task handling code."""
     return ti._run_raw_task(
         mark_success=args.mark_success,
-        job_id=args.job_id,
         pool=args.pool,
     )
 
diff --git a/airflow/executors/debug_executor.py 
b/airflow/executors/debug_executor.py
index 80fb673cab8..aead7e2b2c1 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -84,7 +84,7 @@ class DebugExecutor(BaseExecutor):
         key = ti.key
         try:
             params = self.tasks_params.pop(ti.key, {})
-            ti.run(job_id=ti.job_id, **params)
+            ti.run(**params)
             self.success(key)
             return True
         except Exception as e:
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index a33005b0a52..c900c88674e 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -159,7 +159,6 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
             
wait_for_past_depends_before_skipping=self.wait_for_past_depends_before_skipping,
             ignore_task_deps=self.ignore_task_deps,
             ignore_ti_state=self.ignore_ti_state,
-            job_id=str(self.job.id),
             pool=self.pool,
             external_executor_id=self.external_executor_id,
         ):
@@ -319,6 +318,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
                     "Recorded pid %s does not match the current pid %s", 
recorded_pid, current_pid
                 )
                 raise AirflowException("PID of job runner does not match")
+            ti.update_heartbeat()
+
         elif self.task_runner.return_code() is None and 
hasattr(self.task_runner, "process"):
             self._overtime = (timezone.utcnow() - (ti.end_date or 
timezone.utcnow())).total_seconds()
             if ti.state == TaskInstanceState.SKIPPED:
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index b763011e550..39e4e35087b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -30,7 +30,7 @@ from functools import lru_cache, partial
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
 
-from sqlalchemy import and_, delete, exists, func, not_, or_, select, text, 
update
+from sqlalchemy import and_, delete, exists, func, not_, select, text, update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
 from sqlalchemy.sql import expression
@@ -777,7 +777,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 "TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, 
map_index=%s, "
                 "run_start_date=%s, run_end_date=%s, "
                 "run_duration=%s, state=%s, executor=%s, executor_state=%s, 
try_number=%s, max_tries=%s, "
-                "job_id=%s, pool=%s, queue=%s, priority_weight=%d, 
operator=%s, queued_dttm=%s, "
+                "pool=%s, queue=%s, priority_weight=%d, operator=%s, 
queued_dttm=%s, "
                 "queued_by_job_id=%s, pid=%s"
             )
             cls.logger().info(
@@ -794,7 +794,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 state,
                 try_number,
                 ti.max_tries,
-                ti.job_id,
                 ti.pool,
                 ti.queue,
                 ti.priority_weight,
@@ -821,7 +820,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 span.set_attribute("operator", str(ti.operator))
                 span.set_attribute("try_number", ti.try_number)
                 span.set_attribute("executor_state", state)
-                span.set_attribute("job_id", ti.job_id)
                 span.set_attribute("pool", ti.pool)
                 span.set_attribute("queue", ti.queue)
                 span.set_attribute("priority_weight", ti.priority_weight)
@@ -1977,22 +1975,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 self._purge_zombies(zombies, session=session)
 
     def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
-        from airflow.jobs.job import Job
-
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
         zombies = session.execute(
             select(TI, DM.fileloc, DM.processor_subdir)
             .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
-            .join(Job, TI.job_id == Job.id)
             .join(DM, TI.dag_id == DM.dag_id)
-            .where(TI.state == TaskInstanceState.RUNNING)
-            .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < 
limit_dttm))
-            .where(Job.job_type == "LocalTaskJob")
+            .where(
+                TI.state.in_((TaskInstanceState.RUNNING, 
TaskInstanceState.RESTARTING)),
+                TI.last_heartbeat_at < limit_dttm,
+            )
             .where(TI.queued_by_job_id == self.job.id)
         ).all()
         if zombies:
-            self.log.warning("Failing (%s) jobs without heartbeat after %s", 
len(zombies), limit_dttm)
+            self.log.warning("Failing %s TIs without heartbeat after %s", 
len(zombies), limit_dttm)
         return zombies
 
     def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: 
Session) -> None:
diff --git 
a/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
 
b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
new file mode 100644
index 00000000000..47e72de9dcb
--- /dev/null
+++ 
b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add last_heartbeat_at directly to TI.
+
+Revision ID: d8cd3297971e
+Revises: 5f57a45b8433
+Create Date: 2024-11-01 12:14:59.927266
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.migrations.db_types import TIMESTAMP
+
+# revision identifiers, used by Alembic.
+revision = "d8cd3297971e"
+down_revision = "5f57a45b8433"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("last_heartbeat_at", 
TIMESTAMP(timezone=True), nullable=True))
+        batch_op.drop_index("ti_job_id")
+        batch_op.create_index("ti_heartbeat", ["last_heartbeat_at"], 
unique=False)
+        batch_op.drop_column("job_id")
+    with op.batch_alter_table("task_instance_history", schema=None) as 
batch_op:
+        batch_op.drop_column("job_id")
+
+
+def downgrade():
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("job_id", sa.INTEGER(), 
autoincrement=False, nullable=True))
+        batch_op.drop_index("ti_heartbeat")
+        batch_op.create_index("ti_job_id", ["job_id"], unique=False)
+        batch_op.drop_column("last_heartbeat_at")
+    with op.batch_alter_table("task_instance_history", schema=None) as 
batch_op:
+        batch_op.add_column(sa.Column("job_id", sa.INTEGER(), 
autoincrement=False, nullable=True))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb07ba6d848..e86c4777824 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -133,7 +133,7 @@ from airflow.utils.sqlalchemy import (
     tuple_in_condition,
     with_row_locks,
 )
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.task_group import MappedTaskGroup
 from airflow.utils.task_instance_session import 
set_current_task_instance_session
 from airflow.utils.timeout import timeout
@@ -221,11 +221,16 @@ def _add_log(
     )
 
 
+@internal_api_call
+@provide_session
+def _update_ti_heartbeat(id: str, when: datetime, session: Session = 
NEW_SESSION):
+    session.execute(update(TaskInstance).where(TaskInstance.id == 
id).values(last_heartbeat_at=when))
+
+
 def _run_raw_task(
     ti: TaskInstance | TaskInstancePydantic,
     mark_success: bool = False,
     test_mode: bool = False,
-    job_id: str | None = None,
     pool: str | None = None,
     raise_on_defer: bool = False,
     session: Session | None = None,
@@ -249,7 +254,6 @@ def _run_raw_task(
     ti.test_mode = test_mode
     ti.refresh_from_task(ti.task, pool_override=pool)
     ti.refresh_from_db(session=session)
-    ti.job_id = job_id
     ti.hostname = get_hostname()
     ti.pid = os.getpid()
     if not test_mode:
@@ -451,7 +455,6 @@ def clear_task_instances(
         If set to False, DagRuns state will not be changed.
     :param dag: DAG object
     """
-    job_ids = []
     # Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
     task_id_by_key: dict[str, dict[str, dict[int, dict[int, set[str]]]]] = 
defaultdict(
         lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
@@ -462,11 +465,9 @@ def clear_task_instances(
     for ti in tis:
         TaskInstanceHistory.record_ti(ti, session)
         if ti.state == TaskInstanceState.RUNNING:
-            if ti.job_id:
-                # If a task is cleared when running, set its state to 
RESTARTING so that
-                # the task is terminated and becomes eligible for retry.
-                ti.state = TaskInstanceState.RESTARTING
-                job_ids.append(ti.job_id)
+            # If a task is cleared when running, set its state to RESTARTING 
so that
+            # the task is terminated and becomes eligible for retry.
+            ti.state = TaskInstanceState.RESTARTING
         else:
             ti_dag = dag if dag and dag.dag_id == ti.dag_id else 
dag_bag.get_dag(ti.dag_id, session=session)
             task_id = ti.task_id
@@ -522,11 +523,6 @@ def clear_task_instances(
         delete_qry = TR.__table__.delete().where(conditions)
         session.execute(delete_qry)
 
-    if job_ids:
-        from airflow.jobs.job import Job
-
-        
session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
-
     if dag_run_state is not False and tis:
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
@@ -806,7 +802,6 @@ def _set_ti_attrs(target, source, include_dag_run=False):
     target.max_tries = source.max_tries
     target.hostname = source.hostname
     target.unixname = source.unixname
-    target.job_id = source.job_id
     target.pool = source.pool
     target.pool_slots = source.pool_slots or 1
     target.queue = source.queue
@@ -815,6 +810,7 @@ def _set_ti_attrs(target, source, include_dag_run=False):
     target.custom_operator_name = source.custom_operator_name
     target.queued_dttm = source.queued_dttm
     target.queued_by_job_id = source.queued_by_job_id
+    target.last_heartbeat_at = source.last_heartbeat_at
     target.pid = source.pid
     target.executor = source.executor
     target.executor_config = source.executor_config
@@ -1844,7 +1840,6 @@ class TaskInstance(Base, LoggingMixin):
     max_tries = Column(Integer, server_default=text("-1"))
     hostname = Column(String(1000))
     unixname = Column(String(1000))
-    job_id = Column(Integer)
     pool = Column(String(256), nullable=False)
     pool_slots = Column(Integer, default=1, nullable=False)
     queue = Column(String(256))
@@ -1853,6 +1848,8 @@ class TaskInstance(Base, LoggingMixin):
     custom_operator_name = Column(String(1000))
     queued_dttm = Column(UtcDateTime)
     queued_by_job_id = Column(Integer)
+
+    last_heartbeat_at = Column(UtcDateTime)
     pid = Column(Integer)
     executor = Column(String(1000))
     executor_config = Column(ExecutorConfigType(pickler=dill))
@@ -1885,8 +1882,8 @@ class TaskInstance(Base, LoggingMixin):
         Index("ti_state", state),
         Index("ti_state_lkp", dag_id, task_id, run_id, state),
         Index("ti_pool", pool, state, priority_weight),
-        Index("ti_job_id", job_id),
         Index("ti_trigger_id", trigger_id),
+        Index("ti_heartbeat", last_heartbeat_at),
         PrimaryKeyConstraint("id", name="task_instance_pkey"),
         UniqueConstraint("dag_id", "task_id", "run_id", "map_index", 
name="task_instance_composite_key"),
         ForeignKeyConstraint(
@@ -2035,7 +2032,6 @@ class TaskInstance(Base, LoggingMixin):
         local: bool = False,
         pickle_id: int | None = None,
         raw: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         cfg_path: str | None = None,
     ) -> list[str]:
@@ -2074,7 +2070,6 @@ class TaskInstance(Base, LoggingMixin):
             pickle_id=pickle_id,
             file_path=path,
             raw=raw,
-            job_id=job_id,
             pool=pool,
             cfg_path=cfg_path,
             map_index=ti.map_index,
@@ -2091,7 +2086,6 @@ class TaskInstance(Base, LoggingMixin):
         local: bool = False,
         pickle_id: int | None = None,
         raw: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         cfg_path: str | None = None,
     ) -> list[str]:
@@ -2111,7 +2105,6 @@ class TaskInstance(Base, LoggingMixin):
             local=local,
             pickle_id=pickle_id,
             raw=raw,
-            job_id=job_id,
             pool=pool,
             cfg_path=cfg_path,
         )
@@ -2131,7 +2124,6 @@ class TaskInstance(Base, LoggingMixin):
         pickle_id: int | None = None,
         file_path: PurePath | str | None = None,
         raw: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         cfg_path: str | None = None,
         map_index: int = -1,
@@ -2156,7 +2148,6 @@ class TaskInstance(Base, LoggingMixin):
             associated with the pickled DAG
         :param file_path: path to the file containing the DAG definition
         :param raw: raw mode (needs more details)
-        :param job_id: job ID (needs more details)
         :param pool: the Airflow pool that the task should run in
         :param cfg_path: the Path to the configuration file
         :return: shell command that can be used to run the task instance
@@ -2166,8 +2157,6 @@ class TaskInstance(Base, LoggingMixin):
             cmd.extend(["--mark-success"])
         if pickle_id:
             cmd.extend(["--pickle", str(pickle_id)])
-        if job_id:
-            cmd.extend(["--job-id", str(job_id)])
         if ignore_all_deps:
             cmd.extend(["--ignore-all-dependencies"])
         if ignore_task_deps:
@@ -2641,7 +2630,6 @@ class TaskInstance(Base, LoggingMixin):
         mark_success: bool = False,
         test_mode: bool = False,
         hostname: str = "",
-        job_id: str | None = None,
         pool: str | None = None,
         external_executor_id: str | None = None,
         session: Session = NEW_SESSION,
@@ -2661,7 +2649,6 @@ class TaskInstance(Base, LoggingMixin):
         :param mark_success: Don't run the task, mark its state as success
         :param test_mode: Doesn't record success or failure in the DB
         :param hostname: The hostname of the worker running the task instance.
-        :param job_id: Job (LocalTaskJob / SchedulerJob) ID
         :param pool: specifies the pool to use to run the task instance
         :param external_executor_id: The identifier of the celery executor
         :param session: SQLAlchemy ORM Session
@@ -2684,7 +2671,6 @@ class TaskInstance(Base, LoggingMixin):
         ti.refresh_from_task(task, pool_override=pool)
         ti.test_mode = test_mode
         ti.refresh_from_db(session=session, lock_for_update=True)
-        ti.job_id = job_id
         ti.hostname = hostname
         ti.pid = None
 
@@ -2789,7 +2775,6 @@ class TaskInstance(Base, LoggingMixin):
         ignore_ti_state: bool = False,
         mark_success: bool = False,
         test_mode: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         external_executor_id: str | None = None,
         session: Session = NEW_SESSION,
@@ -2805,7 +2790,6 @@ class TaskInstance(Base, LoggingMixin):
             mark_success=mark_success,
             test_mode=test_mode,
             hostname=get_hostname(),
-            job_id=job_id,
             pool=pool,
             external_executor_id=external_executor_id,
             session=session,
@@ -2876,7 +2860,6 @@ class TaskInstance(Base, LoggingMixin):
         self,
         mark_success: bool = False,
         test_mode: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         raise_on_defer: bool = False,
         session: Session = NEW_SESSION,
@@ -2901,7 +2884,6 @@ class TaskInstance(Base, LoggingMixin):
             ti=self,
             mark_success=mark_success,
             test_mode=test_mode,
-            job_id=job_id,
             pool=pool,
             raise_on_defer=raise_on_defer,
             session=session,
@@ -3071,6 +3053,11 @@ class TaskInstance(Base, LoggingMixin):
         """
         return _execute_task(self, context, task_orig)
 
+    def update_heartbeat(self):
+        cm = nullcontext() if InternalApiConfig.get_use_internal_api() else 
create_session()
+        with cm as session_or_null:
+            _update_ti_heartbeat(self.id, timezone.utcnow(), session_or_null)
+
     @provide_session
     def defer_task(self, exception: TaskDeferred | None, session: Session = 
NEW_SESSION) -> None:
         """
@@ -3101,7 +3088,6 @@ class TaskInstance(Base, LoggingMixin):
         ignore_ti_state: bool = False,
         mark_success: bool = False,
         test_mode: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         session: Session = NEW_SESSION,
         raise_on_defer: bool = False,
@@ -3116,7 +3102,6 @@ class TaskInstance(Base, LoggingMixin):
             ignore_ti_state=ignore_ti_state,
             mark_success=mark_success,
             test_mode=test_mode,
-            job_id=job_id,
             pool=pool,
             session=session,
         )
@@ -3126,7 +3111,6 @@ class TaskInstance(Base, LoggingMixin):
         self._run_raw_task(
             mark_success=mark_success,
             test_mode=test_mode,
-            job_id=job_id,
             pool=pool,
             session=session,
             raise_on_defer=raise_on_defer,
diff --git a/airflow/models/taskinstancehistory.py 
b/airflow/models/taskinstancehistory.py
index ccdca700af6..8c77daf9257 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -70,7 +70,6 @@ class TaskInstanceHistory(Base):
     max_tries = Column(Integer, server_default=text("-1"))
     hostname = Column(String(1000))
     unixname = Column(String(1000))
-    job_id = Column(Integer)
     pool = Column(String(256), nullable=False)
     pool_slots = Column(Integer, default=1, nullable=False)
     queue = Column(String(256))
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index caf44bea4c6..bf121353ca8 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -40,6 +40,7 @@ from airflow.models.taskinstance import (
 )
 from airflow.serialization.pydantic.dag import DagModelPydantic
 from airflow.serialization.pydantic.dag_run import DagRunPydantic
+from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.xcom import XCOM_RETURN_KEY
@@ -83,6 +84,7 @@ PydanticOperator = Annotated[
 class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
     """Serializable representation of the TaskInstance ORM SqlAlchemyModel 
used by internal API."""
 
+    id: str
     task_id: str
     dag_id: str
     run_id: str
@@ -96,7 +98,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
     max_tries: int
     hostname: str
     unixname: str
-    job_id: Optional[int]
     pool: str
     pool_slots: int
     queue: str
@@ -105,6 +106,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
     custom_operator_name: Optional[str]
     queued_dttm: Optional[datetime]
     queued_by_job_id: Optional[int]
+    last_heartbeat_at: Optional[datetime] = None
     pid: Optional[int]
     executor: Optional[str]
     executor_config: Any
@@ -138,7 +140,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
         self,
         mark_success: bool = False,
         test_mode: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         raise_on_defer: bool = False,
         session: Session | None = None,
@@ -147,7 +148,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
             ti=self,
             mark_success=mark_success,
             test_mode=test_mode,
-            job_id=job_id,
             pool=pool,
             raise_on_defer=raise_on_defer,
             session=session,
@@ -252,6 +252,12 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
         _refresh_from_db(task_instance=self, session=session, 
lock_for_update=lock_for_update)
 
+    def update_heartbeat(self):
+        """Update the recorded heartbeat for this task to "now"."""
+        from airflow.models.taskinstance import _update_ti_heartbeat
+
+        return _update_ti_heartbeat(self.id, timezone.utcnow())
+
     def set_duration(self) -> None:
         """Set task instance duration."""
         from airflow.models.taskinstance import _set_duration
@@ -441,7 +447,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
         ignore_ti_state: bool = False,
         mark_success: bool = False,
         test_mode: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         external_executor_id: str | None = None,
         session: Session | None = None,
@@ -457,7 +462,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
             mark_success=mark_success,
             test_mode=test_mode,
             hostname=get_hostname(),
-            job_id=job_id,
             pool=pool,
             external_executor_id=external_executor_id,
             session=session,
@@ -484,7 +488,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
         local: bool = False,
         pickle_id: int | None = None,
         raw: bool = False,
-        job_id: str | None = None,
         pool: str | None = None,
         cfg_path: str | None = None,
     ) -> list[str]:
@@ -504,7 +507,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
             local=local,
             pickle_id=pickle_id,
             raw=raw,
-            job_id=job_id,
             pool=pool,
             cfg_path=cfg_path,
         )
diff --git a/airflow/task/standard_task_runner.py 
b/airflow/task/standard_task_runner.py
index d7f75f40e17..a5641002c96 100644
--- a/airflow/task/standard_task_runner.py
+++ b/airflow/task/standard_task_runner.py
@@ -101,7 +101,6 @@ class StandardTaskRunner(LoggingMixin):
             raw=True,
             pickle_id=self.job_runner.pickle_id,
             mark_success=self.job_runner.mark_success,
-            job_id=self.job_runner.job.id,
             pool=self.job_runner.pool,
             cfg_path=cfg_path,
         )
@@ -159,15 +158,10 @@ class StandardTaskRunner(LoggingMixin):
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
-            # We prefer the job_id passed on the command-line because at this 
time, the
-            # task instance may not have been updated.
-            job_id = getattr(args, "job_id", self._task_instance.job_id)
             self.log.info("Running: %s", self._command)
-            self.log.info("Job %s: Subtask %s", job_id, 
self._task_instance.task_id)
+            self.log.info("Subtask %s", self._task_instance.task_id)
 
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} 
{0.execution_date_or_run_id}"
-            if job_id is not None:
-                proc_title += " {0.job_id}"
             setproctitle(proc_title.format(args))
             return_code = 0
             try:
@@ -179,15 +173,11 @@ class StandardTaskRunner(LoggingMixin):
                     return_code = 0
                     if isinstance(ret, TaskReturnCode):
                         return_code = ret.value
-            except Exception as exc:
+            except Exception:
                 return_code = 1
 
                 self.log.exception(
-                    "Failed to execute job %s for task %s (%s; %r)",
-                    job_id,
-                    self._task_instance.task_id,
-                    exc,
-                    os.getpid(),
+                    "Failed to execute task_id=%s pid=%r", 
self._task_instance.task_id, os.getpid()
                 )
             except SystemExit as sys_ex:
                 # Someone called sys.exit() in the fork - mistakenly. You 
should not run sys.exit() in
@@ -250,10 +240,10 @@ class StandardTaskRunner(LoggingMixin):
         if self._rc == -signal.SIGKILL:
             self.log.error(
                 (
-                    "Job %s was killed before it finished (likely due to 
running out of memory)",
+                    "TI %s was killed before it finished (likely due to 
running out of memory)",
                     "For more information, see 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed";,
                 ),
-                self._task_instance.job_id,
+                self._task_instance.id,
             )
 
     def get_process_pid(self) -> int:
@@ -286,8 +276,7 @@ class StandardTaskRunner(LoggingMixin):
             if not line:
                 break
             self.log.info(
-                "Job %s: Subtask %s %s",
-                self._task_instance.job_id,
+                "Task %s %s",
                 self._task_instance.task_id,
                 line.rstrip("\n"),
             )
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2c82f5f1948..dd3e8c5d200 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -97,7 +97,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "2.9.2": "686269002441",
     "2.10.0": "22ed7efa9da2",
     "2.10.3": "5f2621c13b39",
-    "3.0.0": "5f57a45b8433",
+    "3.0.0": "d8cd3297971e",
 }
 
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d50d7bb2e78..e287c027a89 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5121,7 +5121,6 @@ class TaskInstanceModelView(AirflowModelView):
         "end_date",
         "duration",
         "note",
-        "job_id",
         "hostname",
         "unixname",
         "priority_weight",
@@ -5146,7 +5145,6 @@ class TaskInstanceModelView(AirflowModelView):
         "end_date",
         "duration",
         # "note",  # TODO: Maybe figure out how to re-enable this.
-        "job_id",
         "hostname",
         "unixname",
         "priority_weight",
@@ -5192,7 +5190,7 @@ class TaskInstanceModelView(AirflowModelView):
 
     edit_form = TaskInstanceEditForm
 
-    base_order = ("job_id", "asc")
+    base_order = ("queued_dttm", "asc")
 
     base_filters = [["dag_id", DagFilter, list]]
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index f278eee7d05..8adffd106ea 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-9b9dcf915eff051a5cd77176a78bdcca3703b227373efe83fd0a1d4d05623c28
\ No newline at end of file
+1d781ee92cc59e7647d7f72ddc542b7f17e03fc8b822950db74415c38279d40f
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index 177c5a60f14..1b0d5b346c9 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1144,9 +1144,9 @@
 <text text-anchor="start" x="1024" y="-3145.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
 <text text-anchor="start" x="1029" y="-3145.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
 <polygon fill="none" stroke="black" points="949,-3111 949,-3136 1257,-3136 
1257,-3111 949,-3111"/>
-<text text-anchor="start" x="954" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">job_id</text>
-<text text-anchor="start" x="995" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1000" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="954" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">last_heartbeat_at</text>
+<text text-anchor="start" x="1074" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1079" y="-3120.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 <polygon fill="none" stroke="black" points="949,-3086 949,-3111 1257,-3111 
1257,-3086 949,-3086"/>
 <text text-anchor="start" x="954" y="-3095.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
 <text text-anchor="start" x="1030" y="-3095.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
@@ -1708,176 +1708,172 @@
 <!-- task_instance_history -->
 <g id="node39" class="node">
 <title>task_instance_history</title>
-<polygon fill="none" stroke="black" points="1358,-2320 1358,-2348 1666,-2348 
1666,-2320 1358,-2320"/>
-<text text-anchor="start" x="1414.5" y="-2331.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">task_instance_history</text>
-<polygon fill="none" stroke="black" points="1358,-2295 1358,-2320 1666,-2320 
1666,-2295 1358,-2295"/>
-<text text-anchor="start" x="1363" y="-2304.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
-<text text-anchor="start" x="1376" y="-2304.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1381" y="-2304.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1458" y="-2304.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2270 1358,-2295 1666,-2295 
1666,-2270 1358,-2270"/>
-<text text-anchor="start" x="1363" y="-2279.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">custom_operator_name</text>
-<text text-anchor="start" x="1526" y="-2279.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1531" y="-2279.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2245 1358,-2270 1666,-2270 
1666,-2245 1358,-2245"/>
-<text text-anchor="start" x="1363" y="-2254.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="1409" y="-2254.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1414" y="-2254.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1535" y="-2254.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2220 1358,-2245 1666,-2245 
1666,-2220 1358,-2220"/>
-<text text-anchor="start" x="1363" y="-2229.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">duration</text>
-<text text-anchor="start" x="1422" y="-2229.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1427" y="-2229.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [DOUBLE_PRECISION]</text>
-<polygon fill="none" stroke="black" points="1358,-2195 1358,-2220 1666,-2220 
1666,-2195 1358,-2195"/>
-<text text-anchor="start" x="1363" y="-2204.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
-<text text-anchor="start" x="1427" y="-2204.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1432" y="-2204.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-2170 1358,-2195 1666,-2195 
1666,-2170 1358,-2170"/>
-<text text-anchor="start" x="1363" y="-2179.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor</text>
-<text text-anchor="start" x="1424" y="-2179.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1429" y="-2179.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2145 1358,-2170 1666,-2170 
1666,-2145 1358,-2145"/>
-<text text-anchor="start" x="1363" y="-2154.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor_config</text>
-<text text-anchor="start" x="1473" y="-2154.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1478" y="-2154.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
-<polygon fill="none" stroke="black" points="1358,-2120 1358,-2145 1666,-2145 
1666,-2120 1358,-2120"/>
-<text text-anchor="start" x="1363" y="-2129.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">external_executor_id</text>
-<text text-anchor="start" x="1506" y="-2129.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1511" y="-2129.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1358,-2095 1358,-2120 1666,-2120 
1666,-2095 1358,-2095"/>
-<text text-anchor="start" x="1363" y="-2104.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
-<text text-anchor="start" x="1433" y="-2104.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-2104.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2070 1358,-2095 1666,-2095 
1666,-2070 1358,-2070"/>
-<text text-anchor="start" x="1363" y="-2079.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">job_id</text>
-<text text-anchor="start" x="1404" y="-2079.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1409" y="-2079.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-2045 1358,-2070 1666,-2070 
1666,-2045 1358,-2045"/>
-<text text-anchor="start" x="1363" y="-2054.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
-<text text-anchor="start" x="1439" y="-2054.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1444" y="-2054.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1521" y="-2054.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2020 1358,-2045 1666,-2045 
1666,-2020 1358,-2020"/>
-<text text-anchor="start" x="1363" y="-2029.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">max_tries</text>
-<text text-anchor="start" x="1431" y="-2029.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1436" y="-2029.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1995 1358,-2020 1666,-2020 
1666,-1995 1358,-1995"/>
-<text text-anchor="start" x="1363" y="-2004.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">next_kwargs</text>
-<text text-anchor="start" x="1451" y="-2004.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1456" y="-2004.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<polygon fill="none" stroke="black" points="1358,-1970 1358,-1995 1666,-1995 
1666,-1970 1358,-1970"/>
-<text text-anchor="start" x="1363" y="-1979.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">next_method</text>
-<text text-anchor="start" x="1454" y="-1979.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1459" y="-1979.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1945 1358,-1970 1666,-1970 
1666,-1945 1358,-1945"/>
-<text text-anchor="start" x="1363" y="-1954.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">operator</text>
-<text text-anchor="start" x="1423" y="-1954.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1428" y="-1954.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1920 1358,-1945 1666,-1945 
1666,-1920 1358,-1920"/>
-<text text-anchor="start" x="1363" y="-1929.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pid</text>
-<text text-anchor="start" x="1385" y="-1929.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1390" y="-1929.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1895 1358,-1920 1666,-1920 
1666,-1895 1358,-1895"/>
-<text text-anchor="start" x="1363" y="-1904.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
-<text text-anchor="start" x="1393" y="-1904.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1398" y="-1904.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<text text-anchor="start" x="1519" y="-1904.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1870 1358,-1895 1666,-1895 
1666,-1870 1358,-1870"/>
-<text text-anchor="start" x="1363" y="-1879.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool_slots</text>
-<text text-anchor="start" x="1432" y="-1879.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1437" y="-1879.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1514" y="-1879.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1845 1358,-1870 1666,-1870 
1666,-1845 1358,-1845"/>
-<text text-anchor="start" x="1363" y="-1854.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">priority_weight</text>
-<text text-anchor="start" x="1467" y="-1854.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1472" y="-1854.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1820 1358,-1845 1666,-1845 
1666,-1820 1358,-1820"/>
-<text text-anchor="start" x="1363" y="-1829.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queue</text>
-<text text-anchor="start" x="1407" y="-1829.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1412" y="-1829.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<polygon fill="none" stroke="black" points="1358,-1795 1358,-1820 1666,-1820 
1666,-1795 1358,-1795"/>
-<text text-anchor="start" x="1363" y="-1804.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queued_by_job_id</text>
-<text text-anchor="start" x="1487" y="-1804.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1492" y="-1804.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1770 1358,-1795 1666,-1795 
1666,-1770 1358,-1770"/>
-<text text-anchor="start" x="1363" y="-1779.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queued_dttm</text>
-<text text-anchor="start" x="1456" y="-1779.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1461" y="-1779.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1745 1358,-1770 1666,-1770 
1666,-1745 1358,-1745"/>
-<text text-anchor="start" x="1363" y="-1754.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">rendered_map_index</text>
-<text text-anchor="start" x="1508" y="-1754.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1513" y="-1754.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1358,-1720 1358,-1745 1666,-1745 
1666,-1720 1358,-1720"/>
-<text text-anchor="start" x="1363" y="-1729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">run_id</text>
-<text text-anchor="start" x="1407" y="-1729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1412" y="-1729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1533" y="-1729.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1695 1358,-1720 1666,-1720 
1666,-1695 1358,-1695"/>
-<text text-anchor="start" x="1363" y="-1704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
-<text text-anchor="start" x="1433" y="-1704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-1704.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1670 1358,-1695 1666,-1695 
1666,-1670 1358,-1670"/>
-<text text-anchor="start" x="1363" y="-1679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
-<text text-anchor="start" x="1398" y="-1679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1403" y="-1679.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
-<polygon fill="none" stroke="black" points="1358,-1645 1358,-1670 1666,-1670 
1666,-1645 1358,-1645"/>
-<text text-anchor="start" x="1363" y="-1654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_display_name</text>
-<text text-anchor="start" x="1495" y="-1654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1500" y="-1654.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1620 1358,-1645 1666,-1645 
1666,-1620 1358,-1620"/>
-<text text-anchor="start" x="1363" y="-1629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
-<text text-anchor="start" x="1412" y="-1629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1417" y="-1629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1538" y="-1629.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1595 1358,-1620 1666,-1620 
1666,-1595 1358,-1595"/>
-<text text-anchor="start" x="1363" y="-1604.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1430" y="-1604.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1435" y="-1604.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1570 1358,-1595 1666,-1595 
1666,-1570 1358,-1570"/>
-<text text-anchor="start" x="1363" y="-1579.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_timeout</text>
-<text text-anchor="start" x="1471" y="-1579.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1476" y="-1579.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1545 1358,-1570 1666,-1570 
1666,-1545 1358,-1545"/>
-<text text-anchor="start" x="1363" y="-1554.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">try_number</text>
-<text text-anchor="start" x="1445" y="-1554.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1450" y="-1554.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1527" y="-1554.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1520 1358,-1545 1666,-1545 
1666,-1520 1358,-1520"/>
-<text text-anchor="start" x="1363" y="-1529.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
-<text text-anchor="start" x="1433" y="-1529.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-1529.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1495 1358,-1520 1666,-1520 
1666,-1495 1358,-1495"/>
-<text text-anchor="start" x="1363" y="-1504.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
-<text text-anchor="start" x="1442" y="-1504.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1447" y="-1504.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-2321 1358,-2349 1666,-2349 
1666,-2321 1358,-2321"/>
+<text text-anchor="start" x="1414.5" y="-2332.2" 
font-family="Helvetica,sans-Serif" font-weight="bold" 
font-size="16.00">task_instance_history</text>
+<polygon fill="none" stroke="black" points="1358,-2296 1358,-2321 1666,-2321 
1666,-2296 1358,-2296"/>
+<text text-anchor="start" x="1363" y="-2305.8" 
font-family="Helvetica,sans-Serif" text-decoration="underline" 
font-size="14.00">id</text>
+<text text-anchor="start" x="1376" y="-2305.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1381" y="-2305.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1458" y="-2305.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2271 1358,-2296 1666,-2296 
1666,-2271 1358,-2271"/>
+<text text-anchor="start" x="1363" y="-2280.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">custom_operator_name</text>
+<text text-anchor="start" x="1526" y="-2280.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1531" y="-2280.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2246 1358,-2271 1666,-2271 
1666,-2246 1358,-2246"/>
+<text text-anchor="start" x="1363" y="-2255.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="1409" y="-2255.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1414" y="-2255.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1535" y="-2255.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2221 1358,-2246 1666,-2246 
1666,-2221 1358,-2221"/>
+<text text-anchor="start" x="1363" y="-2230.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">duration</text>
+<text text-anchor="start" x="1422" y="-2230.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1427" y="-2230.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [DOUBLE_PRECISION]</text>
+<polygon fill="none" stroke="black" points="1358,-2196 1358,-2221 1666,-2221 
1666,-2196 1358,-2196"/>
+<text text-anchor="start" x="1363" y="-2205.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
+<text text-anchor="start" x="1427" y="-2205.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1432" y="-2205.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-2171 1358,-2196 1666,-2196 
1666,-2171 1358,-2171"/>
+<text text-anchor="start" x="1363" y="-2180.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor</text>
+<text text-anchor="start" x="1424" y="-2180.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1429" y="-2180.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2146 1358,-2171 1666,-2171 
1666,-2146 1358,-2146"/>
+<text text-anchor="start" x="1363" y="-2155.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">executor_config</text>
+<text text-anchor="start" x="1473" y="-2155.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1478" y="-2155.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
+<polygon fill="none" stroke="black" points="1358,-2121 1358,-2146 1666,-2146 
1666,-2121 1358,-2121"/>
+<text text-anchor="start" x="1363" y="-2130.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">external_executor_id</text>
+<text text-anchor="start" x="1506" y="-2130.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1511" y="-2130.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1358,-2096 1358,-2121 1666,-2121 
1666,-2096 1358,-2096"/>
+<text text-anchor="start" x="1363" y="-2105.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
+<text text-anchor="start" x="1433" y="-2105.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-2105.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2071 1358,-2096 1666,-2096 
1666,-2071 1358,-2071"/>
+<text text-anchor="start" x="1363" y="-2080.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
+<text text-anchor="start" x="1439" y="-2080.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1444" y="-2080.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1521" y="-2080.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2046 1358,-2071 1666,-2071 
1666,-2046 1358,-2046"/>
+<text text-anchor="start" x="1363" y="-2055.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">max_tries</text>
+<text text-anchor="start" x="1431" y="-2055.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1436" y="-2055.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-2021 1358,-2046 1666,-2046 
1666,-2021 1358,-2021"/>
+<text text-anchor="start" x="1363" y="-2030.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">next_kwargs</text>
+<text text-anchor="start" x="1451" y="-2030.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1456" y="-2030.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<polygon fill="none" stroke="black" points="1358,-1996 1358,-2021 1666,-2021 
1666,-1996 1358,-1996"/>
+<text text-anchor="start" x="1363" y="-2005.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">next_method</text>
+<text text-anchor="start" x="1454" y="-2005.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1459" y="-2005.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1971 1358,-1996 1666,-1996 
1666,-1971 1358,-1971"/>
+<text text-anchor="start" x="1363" y="-1980.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">operator</text>
+<text text-anchor="start" x="1423" y="-1980.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1428" y="-1980.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1946 1358,-1971 1666,-1971 
1666,-1946 1358,-1946"/>
+<text text-anchor="start" x="1363" y="-1955.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pid</text>
+<text text-anchor="start" x="1385" y="-1955.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1390" y="-1955.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1921 1358,-1946 1666,-1946 
1666,-1921 1358,-1921"/>
+<text text-anchor="start" x="1363" y="-1930.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
+<text text-anchor="start" x="1393" y="-1930.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1398" y="-1930.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
+<text text-anchor="start" x="1519" y="-1930.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1896 1358,-1921 1666,-1921 
1666,-1896 1358,-1896"/>
+<text text-anchor="start" x="1363" y="-1905.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">pool_slots</text>
+<text text-anchor="start" x="1432" y="-1905.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1437" y="-1905.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1514" y="-1905.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1871 1358,-1896 1666,-1896 
1666,-1871 1358,-1871"/>
+<text text-anchor="start" x="1363" y="-1880.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">priority_weight</text>
+<text text-anchor="start" x="1467" y="-1880.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1472" y="-1880.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1846 1358,-1871 1666,-1871 
1666,-1846 1358,-1846"/>
+<text text-anchor="start" x="1363" y="-1855.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queue</text>
+<text text-anchor="start" x="1407" y="-1855.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1412" y="-1855.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
+<polygon fill="none" stroke="black" points="1358,-1821 1358,-1846 1666,-1846 
1666,-1821 1358,-1821"/>
+<text text-anchor="start" x="1363" y="-1830.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queued_by_job_id</text>
+<text text-anchor="start" x="1487" y="-1830.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1492" y="-1830.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1796 1358,-1821 1666,-1821 
1666,-1796 1358,-1796"/>
+<text text-anchor="start" x="1363" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">queued_dttm</text>
+<text text-anchor="start" x="1456" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1461" y="-1805.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1771 1358,-1796 1666,-1796 
1666,-1771 1358,-1771"/>
+<text text-anchor="start" x="1363" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">rendered_map_index</text>
+<text text-anchor="start" x="1508" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1513" y="-1780.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1358,-1746 1358,-1771 1666,-1771 
1666,-1746 1358,-1746"/>
+<text text-anchor="start" x="1363" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">run_id</text>
+<text text-anchor="start" x="1407" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1412" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1533" y="-1755.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1721 1358,-1746 1666,-1746 
1666,-1721 1358,-1721"/>
+<text text-anchor="start" x="1363" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
+<text text-anchor="start" x="1433" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-1730.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1696 1358,-1721 1666,-1721 
1666,-1696 1358,-1696"/>
+<text text-anchor="start" x="1363" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
+<text text-anchor="start" x="1398" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1403" y="-1705.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
+<polygon fill="none" stroke="black" points="1358,-1671 1358,-1696 1666,-1696 
1666,-1671 1358,-1671"/>
+<text text-anchor="start" x="1363" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_display_name</text>
+<text text-anchor="start" x="1495" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1500" y="-1680.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1646 1358,-1671 1666,-1671 
1666,-1646 1358,-1646"/>
+<text text-anchor="start" x="1363" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
+<text text-anchor="start" x="1412" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1417" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1538" y="-1655.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1621 1358,-1646 1666,-1646 
1666,-1621 1358,-1621"/>
+<text text-anchor="start" x="1363" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1430" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1435" y="-1630.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1596 1358,-1621 1666,-1621 
1666,-1596 1358,-1596"/>
+<text text-anchor="start" x="1363" y="-1605.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_timeout</text>
+<text text-anchor="start" x="1471" y="-1605.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1476" y="-1605.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1571 1358,-1596 1666,-1596 
1666,-1571 1358,-1571"/>
+<text text-anchor="start" x="1363" y="-1580.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">try_number</text>
+<text text-anchor="start" x="1445" y="-1580.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1450" y="-1580.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1527" y="-1580.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1546 1358,-1571 1666,-1571 
1666,-1546 1358,-1546"/>
+<text text-anchor="start" x="1363" y="-1555.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
+<text text-anchor="start" x="1433" y="-1555.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-1555.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1521 1358,-1546 1666,-1546 
1666,-1521 1358,-1521"/>
+<text text-anchor="start" x="1363" y="-1530.8" 
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
+<text text-anchor="start" x="1442" y="-1530.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1447" y="-1530.8" 
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
 </g>
 <!-- task_instance&#45;&#45;task_instance_history -->
 <g id="edge50" class="edge">
 <title>task_instance&#45;&#45;task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1263.15,-2532.36C1267.82,-2520.44 1272.45,-2508.63 1277,-2497 
1300.65,-2436.57 1325.57,-2372.69 1349.97,-2310.33"/>
-<text text-anchor="start" x="1318.97" y="-2299.13" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1253.15" y="-2521.16" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265,-2536.49C1292.94,-2465.4 1321.99,-2392.03 1349.92,-2322"/>
+<text text-anchor="start" x="1318.92" y="-2310.8" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2525.29" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;task_instance_history -->
 <g id="edge51" class="edge">
 <title>task_instance&#45;&#45;task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265.1,-2545.38C1269.11,-2535.15 1273.08,-2525.01 1277,-2515 
1300.65,-2454.57 1325.57,-2390.69 1349.97,-2328.12"/>
-<text text-anchor="start" x="1318.97" y="-2316.92" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2534.18" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265,-2549.72C1292.94,-2478.99 1321.99,-2405.61 1349.92,-2335.23"/>
+<text text-anchor="start" x="1318.92" y="-2339.03" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2553.52" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;task_instance_history -->
 <g id="edge52" class="edge">
 <title>task_instance&#45;&#45;task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265.1,-2563.38C1269.11,-2553.15 1273.08,-2543.01 1277,-2533 
1300.65,-2472.57 1325.57,-2408.69 1349.97,-2345.91"/>
-<text text-anchor="start" x="1318.97" y="-2334.71" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2552.18" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265,-2562.96C1292.94,-2492.58 1321.99,-2419.2 1349.92,-2348.47"/>
+<text text-anchor="start" x="1318.92" y="-2352.27" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2566.76" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;task_instance_history -->
 <g id="edge53" class="edge">
 <title>task_instance&#45;&#45;task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265.1,-2581.38C1269.11,-2571.15 1273.08,-2561.01 1277,-2551 
1302.03,-2487.04 1328.48,-2419.23 1354.24,-2352.7"/>
-<text text-anchor="start" x="1323.24" y="-2356.5" font-family="Times,serif" 
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2570.18" font-family="Times,serif" 
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1265,-2576.19C1294.05,-2503.38 1324.3,-2426.95 1353.25,-2353.24"/>
+<text text-anchor="start" x="1353.25" y="-2357.04" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2579.99" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- backfill -->
 <g id="node33" class="node">
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index f3441ceaf72..f133a67e08e 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``5f57a45b8433`` (head) | ``486ac7936b78`` | ``3.0.0``         | Drop 
task_fail table.                                        |
+| ``d8cd3297971e`` (head) | ``5f57a45b8433`` | ``3.0.0``         | Add 
last_heartbeat_at directly to TI.                        |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``5f57a45b8433``        | ``486ac7936b78`` | ``3.0.0``         | Drop 
task_fail table.                                        |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``486ac7936b78``        | ``d59cbbef95eb`` | ``3.0.0``         | remove 
scheduler_lock column.                                |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py 
b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
index 1dfc51a0a04..16c1df48a9e 100755
--- a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
+++ b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
@@ -52,6 +52,8 @@ def compare_attributes(path1, path2):
         "triggerer_job",
         "note",
         "rendered_task_instance_fields",
+        # Storing last heartbeat for historic TIs is not interesting/useful
+        "last_heartbeat_at",
     }  # exclude attrs not necessary to be in TaskInstanceHistory
     if not diff:
         return
diff --git 
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 237ef5910c7..68ecd1e8389 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -77,7 +77,6 @@ class TestMappedTaskInstanceEndpoint:
             "duration": 10000,
             "pool": "default_pool",
             "queue": "default_queue",
-            "job_id": 0,
         }
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index bc8836981d4..e1fa6d13b74 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -81,7 +81,6 @@ class TestTaskInstanceEndpoint:
             "duration": 10000,
             "pool": "default_pool",
             "queue": "default_queue",
-            "job_id": 0,
         }
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py 
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index fa9cc0b161d..717f17ca278 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -68,7 +68,6 @@ class TestTaskInstanceEndpoint:
             "duration": 10000,
             "pool": "default_pool",
             "queue": "default_queue",
-            "job_id": 0,
         }
         clear_db_runs()
         clear_rendered_ti_fields()
diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py
index eb12f281606..3310502a97b 100644
--- a/tests/assets/test_manager.py
+++ b/tests/assets/test_manager.py
@@ -59,6 +59,7 @@ def clear_assets():
 @pytest.fixture
 def mock_task_instance():
     return TaskInstancePydantic(
+        id="1",
         task_id="5",
         dag_id="7",
         run_id="11",
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index 9b605e818d8..5a4e0b27924 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -932,7 +932,7 @@ class TestLogsfromTaskRunCommand:
         print(logs)  # In case of a test failures this line would show 
detailed log
         logs_list = logs.splitlines()
 
-        assert f"Subtask {self.task_id}" in logs
+        assert f"Task {self.task_id}" in logs
         assert "standard_task_runner.py" in logs
         self.assert_log_line("Log from DAG Logger", logs_list)
         self.assert_log_line("Log from TI Logger", logs_list)
diff --git a/tests/executors/test_debug_executor.py 
b/tests/executors/test_debug_executor.py
index 20ee821842c..a8ad6679576 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -50,7 +50,7 @@ class TestDebugExecutor:
         succeeded = executor._run_task(task_instance_mock)
 
         assert succeeded
-        task_instance_mock.run.assert_called_once_with(job_id=job_id)
+        task_instance_mock.run.assert_called()
 
     def test_queue_task_instance(self):
         key = "ti_key"
diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 84a7465a823..7ee03747883 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -131,17 +131,21 @@ class TestLocalTaskJob:
         assert all(check_result_2)
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
-    def test_localtaskjob_heartbeat(self, dag_maker):
+    def test_localtaskjob_heartbeat(self, dag_maker, time_machine):
         session = settings.Session()
         with dag_maker("test_localtaskjob_heartbeat"):
             op1 = EmptyOperator(task_id="op1")
 
+        time_machine.move_to(DEFAULT_DATE, tick=False)
+
         dr = dag_maker.create_dagrun()
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         ti.state = State.RUNNING
         ti.hostname = "blablabla"
         session.commit()
 
+        assert ti.last_heartbeat_at is None, "Pre-conditioncheck"
+
         job1 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
         job_runner = LocalTaskJobRunner(job=job1, task_instance=ti, 
ignore_ti_state=True)
         ti.task = op1
@@ -149,9 +153,12 @@ class TestLocalTaskJob:
         job1.task_runner = StandardTaskRunner(job_runner)
         job1.task_runner.process = mock.Mock()
         job_runner.task_runner = job1.task_runner
-        with pytest.raises(AirflowException):
+        with pytest.raises(AirflowException, match="Hostname .* does not 
match"):
             job_runner.heartbeat_callback()
 
+        ti = session.get(TaskInstance, (ti.id,))
+        assert ti.last_heartbeat_at is None, "Should still be none"
+
         job1.task_runner.process.pid = 1
         ti.state = State.RUNNING
         ti.hostname = get_hostname()
@@ -164,19 +171,22 @@ class TestLocalTaskJob:
         job_runner.heartbeat_callback(session=None)
 
         job1.task_runner.process.pid = 2
-        with pytest.raises(AirflowException):
+        with pytest.raises(AirflowException, match="PID .* does not match"):
             job_runner.heartbeat_callback()
 
         # Now, set the ti.pid to None and test that no error
         # is raised.
         ti.pid = None
-        session.merge(ti)
+        ti = session.merge(ti)
         session.commit()
         assert ti.pid != job1.task_runner.process.pid
         assert not ti.run_as_user
         assert not job1.task_runner.run_as_user
         job_runner.heartbeat_callback()
 
+        ti = session.get(TaskInstance, (ti.id,))
+        assert ti.last_heartbeat_at == DEFAULT_DATE
+
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     @mock.patch("subprocess.check_call")
     @mock.patch("airflow.jobs.local_task_job_runner.psutil")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 311de0ce2b6..da3ccc201eb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -50,7 +50,6 @@ from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.executor_constants import MOCK_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.job import Job, run_job
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.models.asset import AssetActive, AssetDagRunQueue, AssetEvent, 
AssetModel
 from airflow.models.backfill import Backfill, _create_backfill
@@ -68,7 +67,7 @@ from airflow.timetables.base import DataInterval
 from airflow.utils import timezone
 from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
 from tests.listeners import dag_listener
@@ -5665,16 +5664,10 @@ class TestSchedulerJob:
             for task_id in tasks_to_setup:
                 task = dag.get_task(task_id=task_id)
                 ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING)
-                ti.queued_by_job_id = 999
-
-                local_job = Job(dag_id=ti.dag_id)
-                LocalTaskJobRunner(job=local_job, task_instance=ti)
-                local_job.state = TaskInstanceState.FAILED
 
-                session.add(local_job)
-                session.flush()
+                ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
+                ti.queued_by_job_id = 999
 
-                ti.job_id = local_job.id
                 session.add(ti)
                 session.flush()
 
@@ -5733,13 +5726,6 @@ class TestSchedulerJob:
                 ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING)
                 ti.queued_by_job_id = 999
 
-                local_job = Job(dag_id=ti.dag_id)
-                local_job.state = TaskInstanceState.FAILED
-
-                session.add(local_job)
-                session.flush()
-
-                ti.job_id = local_job.id
                 session.add(ti)
                 session.flush()
 
@@ -5795,17 +5781,11 @@ class TestSchedulerJob:
             task = dag.get_task(task_id="run_this_last")
 
             ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-
-            local_job = Job(dag_id=ti.dag_id)
-            LocalTaskJobRunner(job=local_job, task_instance=ti)
-            local_job.state = JobState.FAILED
-            session.add(local_job)
-            session.flush()
+            ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
 
             # TODO: If there was an actual Relationship between TI and Job
             # we wouldn't need this extra commit
             session.add(ti)
-            ti.job_id = local_job.id
             session.flush()
 
         scheduler_job = Job()
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index ccd19ad3272..8a1df0594e4 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3993,7 +3993,6 @@ class TestTaskInstance:
             "hostname": "some_unique_hostname",
             "id": str(uuid6.uuid7()),
             "unixname": "some_unique_unixname",
-            "job_id": 1234,
             "pool": "some_fake_pool_id",
             "pool_slots": 25,
             "queue": "some_queue_id",
@@ -4004,6 +4003,7 @@ class TestTaskInstance:
             "rendered_map_index": None,
             "queued_by_job_id": 321,
             "pid": 123,
+            "last_heartbeat_at": run_date + datetime.timedelta(hours=1, 
seconds=4),
             "executor": "some_executor",
             "executor_config": {"Some": {"extra": "information"}},
             "external_executor_id": "some_executor_id",
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index fabd104e8c2..19caafe55bc 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1109,7 +1109,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1145,7 +1145,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1181,7 +1181,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1217,7 +1217,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1253,7 +1253,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1289,7 +1289,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,
@@ -1325,7 +1325,7 @@ def test_task_instances(admin_client):
             "external_executor_id": None,
             "hostname": "",
             "id": unittest.mock.ANY,  # Ignore the `id` field
-            "job_id": None,
+            "last_heartbeat_at": None,
             "map_index": -1,
             "max_tries": 0,
             "next_kwargs": None,

Reply via email to