This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6fd7052f863 Move TaskInstance heartbeat directly on to TI row, not on
Job row (#43599)
6fd7052f863 is described below
commit 6fd7052f863ad9fc95ea4b82f8993fc5858d0dc3
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Sun Nov 3 01:04:51 2024 +0000
Move TaskInstance heartbeat directly on to TI row, not on Job row (#43599)
This is part of the work for AIP-72 epic, but is done as a separate PR for
ease of review.
This PR by itself doesn't remove the LocalTaskJob row (that will happen in a
future PR when the execution code is moved over to live in the TaskSDK) but
this paves the way for it. The reason we are making this change is:
- Having a separate row for tracking TI heartbeat is not really buying us
much
- With the addition of TaskInstanceHistory we don't need _another_ separate
record of when/where TIs were run
- It simplifies things (one less join in finding zombies)
- Makes zombie tracking easier -- it is now just on the TI state, not the
combination of TI and Job state.
---
airflow/cli/cli_config.py | 2 -
airflow/cli/commands/task_command.py | 1 -
airflow/executors/debug_executor.py | 2 +-
airflow/jobs/local_task_job_runner.py | 3 +-
airflow/jobs/scheduler_job_runner.py | 18 +-
...5_3_0_0_add_last_heartbeat_at_directly_to_ti.py | 60 ++++
airflow/models/taskinstance.py | 54 ++--
airflow/models/taskinstancehistory.py | 1 -
airflow/serialization/pydantic/taskinstance.py | 16 +-
airflow/task/standard_task_runner.py | 23 +-
airflow/utils/db.py | 2 +-
airflow/www/views.py | 4 +-
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
docs/apache-airflow/img/airflow_erd.svg | 310 ++++++++++-----------
docs/apache-airflow/migrations-ref.rst | 4 +-
.../ci/pre_commit/check_ti_vs_tis_attributes.py | 2 +
.../test_mapped_task_instance_endpoint.py | 1 -
.../endpoints/test_task_instance_endpoint.py | 1 -
.../core_api/routes/public/test_task_instances.py | 1 -
tests/assets/test_manager.py | 1 +
tests/cli/commands/test_task_command.py | 2 +-
tests/executors/test_debug_executor.py | 2 +-
tests/jobs/test_local_task_job.py | 18 +-
tests/jobs/test_scheduler_job.py | 28 +-
tests/models/test_taskinstance.py | 2 +-
tests/www/views/test_views_tasks.py | 14 +-
26 files changed, 294 insertions(+), 280 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 15543023cbf..06ac2f7bd81 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -580,7 +580,6 @@ ARG_SHIP_DAG = Arg(
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the
worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the
entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead
of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task
index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of
dag file", action="store_true")
@@ -1354,7 +1353,6 @@ TASKS_COMMANDS = (
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
- ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index 23f6e1abbe5..03d2737072f 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -341,7 +341,6 @@ def _run_raw_task(args, ti: TaskInstance) -> None |
TaskReturnCode:
"""Run the main task handling code."""
return ti._run_raw_task(
mark_success=args.mark_success,
- job_id=args.job_id,
pool=args.pool,
)
diff --git a/airflow/executors/debug_executor.py
b/airflow/executors/debug_executor.py
index 80fb673cab8..aead7e2b2c1 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -84,7 +84,7 @@ class DebugExecutor(BaseExecutor):
key = ti.key
try:
params = self.tasks_params.pop(ti.key, {})
- ti.run(job_id=ti.job_id, **params)
+ ti.run(**params)
self.success(key)
return True
except Exception as e:
diff --git a/airflow/jobs/local_task_job_runner.py
b/airflow/jobs/local_task_job_runner.py
index a33005b0a52..c900c88674e 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -159,7 +159,6 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
wait_for_past_depends_before_skipping=self.wait_for_past_depends_before_skipping,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
- job_id=str(self.job.id),
pool=self.pool,
external_executor_id=self.external_executor_id,
):
@@ -319,6 +318,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
"Recorded pid %s does not match the current pid %s",
recorded_pid, current_pid
)
raise AirflowException("PID of job runner does not match")
+ ti.update_heartbeat()
+
elif self.task_runner.return_code() is None and
hasattr(self.task_runner, "process"):
self._overtime = (timezone.utcnow() - (ti.end_date or
timezone.utcnow())).total_seconds()
if ti.state == TaskInstanceState.SKIPPED:
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index b763011e550..39e4e35087b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -30,7 +30,7 @@ from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
-from sqlalchemy import and_, delete, exists, func, not_, or_, select, text,
update
+from sqlalchemy import and_, delete, exists, func, not_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
@@ -777,7 +777,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s,
map_index=%s, "
"run_start_date=%s, run_end_date=%s, "
"run_duration=%s, state=%s, executor=%s, executor_state=%s,
try_number=%s, max_tries=%s, "
- "job_id=%s, pool=%s, queue=%s, priority_weight=%d,
operator=%s, queued_dttm=%s, "
+ "pool=%s, queue=%s, priority_weight=%d, operator=%s,
queued_dttm=%s, "
"queued_by_job_id=%s, pid=%s"
)
cls.logger().info(
@@ -794,7 +794,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
state,
try_number,
ti.max_tries,
- ti.job_id,
ti.pool,
ti.queue,
ti.priority_weight,
@@ -821,7 +820,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
- span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
span.set_attribute("queue", ti.queue)
span.set_attribute("priority_weight", ti.priority_weight)
@@ -1977,22 +1975,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self._purge_zombies(zombies, session=session)
def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
- from airflow.jobs.job import Job
-
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() -
timedelta(seconds=self._zombie_threshold_secs)
zombies = session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
- .join(Job, TI.job_id == Job.id)
.join(DM, TI.dag_id == DM.dag_id)
- .where(TI.state == TaskInstanceState.RUNNING)
- .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat <
limit_dttm))
- .where(Job.job_type == "LocalTaskJob")
+ .where(
+ TI.state.in_((TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING)),
+ TI.last_heartbeat_at < limit_dttm,
+ )
.where(TI.queued_by_job_id == self.job.id)
).all()
if zombies:
- self.log.warning("Failing (%s) jobs without heartbeat after %s",
len(zombies), limit_dttm)
+ self.log.warning("Failing %s TIs without heartbeat after %s",
len(zombies), limit_dttm)
return zombies
def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session:
Session) -> None:
diff --git
a/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
new file mode 100644
index 00000000000..47e72de9dcb
--- /dev/null
+++
b/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add last_heartbeat_at directly to TI.
+
+Revision ID: d8cd3297971e
+Revises: 5f57a45b8433
+Create Date: 2024-11-01 12:14:59.927266
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.migrations.db_types import TIMESTAMP
+
+# revision identifiers, used by Alembic.
+revision = "d8cd3297971e"
+down_revision = "5f57a45b8433"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("last_heartbeat_at",
TIMESTAMP(timezone=True), nullable=True))
+ batch_op.drop_index("ti_job_id")
+ batch_op.create_index("ti_heartbeat", ["last_heartbeat_at"],
unique=False)
+ batch_op.drop_column("job_id")
+ with op.batch_alter_table("task_instance_history", schema=None) as
batch_op:
+ batch_op.drop_column("job_id")
+
+
+def downgrade():
+ with op.batch_alter_table("task_instance", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("job_id", sa.INTEGER(),
autoincrement=False, nullable=True))
+ batch_op.drop_index("ti_heartbeat")
+ batch_op.create_index("ti_job_id", ["job_id"], unique=False)
+ batch_op.drop_column("last_heartbeat_at")
+ with op.batch_alter_table("task_instance_history", schema=None) as
batch_op:
+ batch_op.add_column(sa.Column("job_id", sa.INTEGER(),
autoincrement=False, nullable=True))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb07ba6d848..e86c4777824 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -133,7 +133,7 @@ from airflow.utils.sqlalchemy import (
tuple_in_condition,
with_row_locks,
)
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.task_instance_session import
set_current_task_instance_session
from airflow.utils.timeout import timeout
@@ -221,11 +221,16 @@ def _add_log(
)
+@internal_api_call
+@provide_session
+def _update_ti_heartbeat(id: str, when: datetime, session: Session =
NEW_SESSION):
+ session.execute(update(TaskInstance).where(TaskInstance.id ==
id).values(last_heartbeat_at=when))
+
+
def _run_raw_task(
ti: TaskInstance | TaskInstancePydantic,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session | None = None,
@@ -249,7 +254,6 @@ def _run_raw_task(
ti.test_mode = test_mode
ti.refresh_from_task(ti.task, pool_override=pool)
ti.refresh_from_db(session=session)
- ti.job_id = job_id
ti.hostname = get_hostname()
ti.pid = os.getpid()
if not test_mode:
@@ -451,7 +455,6 @@ def clear_task_instances(
If set to False, DagRuns state will not be changed.
:param dag: DAG object
"""
- job_ids = []
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
task_id_by_key: dict[str, dict[str, dict[int, dict[int, set[str]]]]] =
defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
@@ -462,11 +465,9 @@ def clear_task_instances(
for ti in tis:
TaskInstanceHistory.record_ti(ti, session)
if ti.state == TaskInstanceState.RUNNING:
- if ti.job_id:
- # If a task is cleared when running, set its state to
RESTARTING so that
- # the task is terminated and becomes eligible for retry.
- ti.state = TaskInstanceState.RESTARTING
- job_ids.append(ti.job_id)
+ # If a task is cleared when running, set its state to RESTARTING
so that
+ # the task is terminated and becomes eligible for retry.
+ ti.state = TaskInstanceState.RESTARTING
else:
ti_dag = dag if dag and dag.dag_id == ti.dag_id else
dag_bag.get_dag(ti.dag_id, session=session)
task_id = ti.task_id
@@ -522,11 +523,6 @@ def clear_task_instances(
delete_qry = TR.__table__.delete().where(conditions)
session.execute(delete_qry)
- if job_ids:
- from airflow.jobs.job import Job
-
-
session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))
-
if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import
@@ -806,7 +802,6 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.max_tries = source.max_tries
target.hostname = source.hostname
target.unixname = source.unixname
- target.job_id = source.job_id
target.pool = source.pool
target.pool_slots = source.pool_slots or 1
target.queue = source.queue
@@ -815,6 +810,7 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.custom_operator_name = source.custom_operator_name
target.queued_dttm = source.queued_dttm
target.queued_by_job_id = source.queued_by_job_id
+ target.last_heartbeat_at = source.last_heartbeat_at
target.pid = source.pid
target.executor = source.executor
target.executor_config = source.executor_config
@@ -1844,7 +1840,6 @@ class TaskInstance(Base, LoggingMixin):
max_tries = Column(Integer, server_default=text("-1"))
hostname = Column(String(1000))
unixname = Column(String(1000))
- job_id = Column(Integer)
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
@@ -1853,6 +1848,8 @@ class TaskInstance(Base, LoggingMixin):
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
+
+ last_heartbeat_at = Column(UtcDateTime)
pid = Column(Integer)
executor = Column(String(1000))
executor_config = Column(ExecutorConfigType(pickler=dill))
@@ -1885,8 +1882,8 @@ class TaskInstance(Base, LoggingMixin):
Index("ti_state", state),
Index("ti_state_lkp", dag_id, task_id, run_id, state),
Index("ti_pool", pool, state, priority_weight),
- Index("ti_job_id", job_id),
Index("ti_trigger_id", trigger_id),
+ Index("ti_heartbeat", last_heartbeat_at),
PrimaryKeyConstraint("id", name="task_instance_pkey"),
UniqueConstraint("dag_id", "task_id", "run_id", "map_index",
name="task_instance_composite_key"),
ForeignKeyConstraint(
@@ -2035,7 +2032,6 @@ class TaskInstance(Base, LoggingMixin):
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -2074,7 +2070,6 @@ class TaskInstance(Base, LoggingMixin):
pickle_id=pickle_id,
file_path=path,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
map_index=ti.map_index,
@@ -2091,7 +2086,6 @@ class TaskInstance(Base, LoggingMixin):
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -2111,7 +2105,6 @@ class TaskInstance(Base, LoggingMixin):
local=local,
pickle_id=pickle_id,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
)
@@ -2131,7 +2124,6 @@ class TaskInstance(Base, LoggingMixin):
pickle_id: int | None = None,
file_path: PurePath | str | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
map_index: int = -1,
@@ -2156,7 +2148,6 @@ class TaskInstance(Base, LoggingMixin):
associated with the pickled DAG
:param file_path: path to the file containing the DAG definition
:param raw: raw mode (needs more details)
- :param job_id: job ID (needs more details)
:param pool: the Airflow pool that the task should run in
:param cfg_path: the Path to the configuration file
:return: shell command that can be used to run the task instance
@@ -2166,8 +2157,6 @@ class TaskInstance(Base, LoggingMixin):
cmd.extend(["--mark-success"])
if pickle_id:
cmd.extend(["--pickle", str(pickle_id)])
- if job_id:
- cmd.extend(["--job-id", str(job_id)])
if ignore_all_deps:
cmd.extend(["--ignore-all-dependencies"])
if ignore_task_deps:
@@ -2641,7 +2630,6 @@ class TaskInstance(Base, LoggingMixin):
mark_success: bool = False,
test_mode: bool = False,
hostname: str = "",
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session = NEW_SESSION,
@@ -2661,7 +2649,6 @@ class TaskInstance(Base, LoggingMixin):
:param mark_success: Don't run the task, mark its state as success
:param test_mode: Doesn't record success or failure in the DB
:param hostname: The hostname of the worker running the task instance.
- :param job_id: Job (LocalTaskJob / SchedulerJob) ID
:param pool: specifies the pool to use to run the task instance
:param external_executor_id: The identifier of the celery executor
:param session: SQLAlchemy ORM Session
@@ -2684,7 +2671,6 @@ class TaskInstance(Base, LoggingMixin):
ti.refresh_from_task(task, pool_override=pool)
ti.test_mode = test_mode
ti.refresh_from_db(session=session, lock_for_update=True)
- ti.job_id = job_id
ti.hostname = hostname
ti.pid = None
@@ -2789,7 +2775,6 @@ class TaskInstance(Base, LoggingMixin):
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session = NEW_SESSION,
@@ -2805,7 +2790,6 @@ class TaskInstance(Base, LoggingMixin):
mark_success=mark_success,
test_mode=test_mode,
hostname=get_hostname(),
- job_id=job_id,
pool=pool,
external_executor_id=external_executor_id,
session=session,
@@ -2876,7 +2860,6 @@ class TaskInstance(Base, LoggingMixin):
self,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session = NEW_SESSION,
@@ -2901,7 +2884,6 @@ class TaskInstance(Base, LoggingMixin):
ti=self,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
raise_on_defer=raise_on_defer,
session=session,
@@ -3071,6 +3053,11 @@ class TaskInstance(Base, LoggingMixin):
"""
return _execute_task(self, context, task_orig)
+ def update_heartbeat(self):
+ cm = nullcontext() if InternalApiConfig.get_use_internal_api() else
create_session()
+ with cm as session_or_null:
+ _update_ti_heartbeat(self.id, timezone.utcnow(), session_or_null)
+
@provide_session
def defer_task(self, exception: TaskDeferred | None, session: Session =
NEW_SESSION) -> None:
"""
@@ -3101,7 +3088,6 @@ class TaskInstance(Base, LoggingMixin):
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
session: Session = NEW_SESSION,
raise_on_defer: bool = False,
@@ -3116,7 +3102,6 @@ class TaskInstance(Base, LoggingMixin):
ignore_ti_state=ignore_ti_state,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
session=session,
)
@@ -3126,7 +3111,6 @@ class TaskInstance(Base, LoggingMixin):
self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
session=session,
raise_on_defer=raise_on_defer,
diff --git a/airflow/models/taskinstancehistory.py
b/airflow/models/taskinstancehistory.py
index ccdca700af6..8c77daf9257 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -70,7 +70,6 @@ class TaskInstanceHistory(Base):
max_tries = Column(Integer, server_default=text("-1"))
hostname = Column(String(1000))
unixname = Column(String(1000))
- job_id = Column(Integer)
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
diff --git a/airflow/serialization/pydantic/taskinstance.py
b/airflow/serialization/pydantic/taskinstance.py
index caf44bea4c6..bf121353ca8 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -40,6 +40,7 @@ from airflow.models.taskinstance import (
)
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
+from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.xcom import XCOM_RETURN_KEY
@@ -83,6 +84,7 @@ PydanticOperator = Annotated[
class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
"""Serializable representation of the TaskInstance ORM SqlAlchemyModel
used by internal API."""
+ id: str
task_id: str
dag_id: str
run_id: str
@@ -96,7 +98,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
max_tries: int
hostname: str
unixname: str
- job_id: Optional[int]
pool: str
pool_slots: int
queue: str
@@ -105,6 +106,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
custom_operator_name: Optional[str]
queued_dttm: Optional[datetime]
queued_by_job_id: Optional[int]
+ last_heartbeat_at: Optional[datetime] = None
pid: Optional[int]
executor: Optional[str]
executor_config: Any
@@ -138,7 +140,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
self,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session | None = None,
@@ -147,7 +148,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
ti=self,
mark_success=mark_success,
test_mode=test_mode,
- job_id=job_id,
pool=pool,
raise_on_defer=raise_on_defer,
session=session,
@@ -252,6 +252,12 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
_refresh_from_db(task_instance=self, session=session,
lock_for_update=lock_for_update)
+ def update_heartbeat(self):
+ """Update the recorded heartbeat for this task to "now"."""
+ from airflow.models.taskinstance import _update_ti_heartbeat
+
+ return _update_ti_heartbeat(self.id, timezone.utcnow())
+
def set_duration(self) -> None:
"""Set task instance duration."""
from airflow.models.taskinstance import _set_duration
@@ -441,7 +447,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
- job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
session: Session | None = None,
@@ -457,7 +462,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
mark_success=mark_success,
test_mode=test_mode,
hostname=get_hostname(),
- job_id=job_id,
pool=pool,
external_executor_id=external_executor_id,
session=session,
@@ -484,7 +488,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
local: bool = False,
pickle_id: int | None = None,
raw: bool = False,
- job_id: str | None = None,
pool: str | None = None,
cfg_path: str | None = None,
) -> list[str]:
@@ -504,7 +507,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
local=local,
pickle_id=pickle_id,
raw=raw,
- job_id=job_id,
pool=pool,
cfg_path=cfg_path,
)
diff --git a/airflow/task/standard_task_runner.py
b/airflow/task/standard_task_runner.py
index d7f75f40e17..a5641002c96 100644
--- a/airflow/task/standard_task_runner.py
+++ b/airflow/task/standard_task_runner.py
@@ -101,7 +101,6 @@ class StandardTaskRunner(LoggingMixin):
raw=True,
pickle_id=self.job_runner.pickle_id,
mark_success=self.job_runner.mark_success,
- job_id=self.job_runner.job.id,
pool=self.job_runner.pool,
cfg_path=cfg_path,
)
@@ -159,15 +158,10 @@ class StandardTaskRunner(LoggingMixin):
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(self._command[1:])
- # We prefer the job_id passed on the command-line because at this
time, the
- # task instance may not have been updated.
- job_id = getattr(args, "job_id", self._task_instance.job_id)
self.log.info("Running: %s", self._command)
- self.log.info("Job %s: Subtask %s", job_id,
self._task_instance.task_id)
+ self.log.info("Subtask %s", self._task_instance.task_id)
proc_title = "airflow task runner: {0.dag_id} {0.task_id}
{0.execution_date_or_run_id}"
- if job_id is not None:
- proc_title += " {0.job_id}"
setproctitle(proc_title.format(args))
return_code = 0
try:
@@ -179,15 +173,11 @@ class StandardTaskRunner(LoggingMixin):
return_code = 0
if isinstance(ret, TaskReturnCode):
return_code = ret.value
- except Exception as exc:
+ except Exception:
return_code = 1
self.log.exception(
- "Failed to execute job %s for task %s (%s; %r)",
- job_id,
- self._task_instance.task_id,
- exc,
- os.getpid(),
+ "Failed to execute task_id=%s pid=%r",
self._task_instance.task_id, os.getpid()
)
except SystemExit as sys_ex:
# Someone called sys.exit() in the fork - mistakenly. You
should not run sys.exit() in
@@ -250,10 +240,10 @@ class StandardTaskRunner(LoggingMixin):
if self._rc == -signal.SIGKILL:
self.log.error(
(
- "Job %s was killed before it finished (likely due to
running out of memory)",
+ "TI %s was killed before it finished (likely due to
running out of memory)",
"For more information, see
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed",
),
- self._task_instance.job_id,
+ self._task_instance.id,
)
def get_process_pid(self) -> int:
@@ -286,8 +276,7 @@ class StandardTaskRunner(LoggingMixin):
if not line:
break
self.log.info(
- "Job %s: Subtask %s %s",
- self._task_instance.job_id,
+ "Task %s %s",
self._task_instance.task_id,
line.rstrip("\n"),
)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2c82f5f1948..dd3e8c5d200 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -97,7 +97,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
- "3.0.0": "5f57a45b8433",
+ "3.0.0": "d8cd3297971e",
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d50d7bb2e78..e287c027a89 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5121,7 +5121,6 @@ class TaskInstanceModelView(AirflowModelView):
"end_date",
"duration",
"note",
- "job_id",
"hostname",
"unixname",
"priority_weight",
@@ -5146,7 +5145,6 @@ class TaskInstanceModelView(AirflowModelView):
"end_date",
"duration",
# "note", # TODO: Maybe figure out how to re-enable this.
- "job_id",
"hostname",
"unixname",
"priority_weight",
@@ -5192,7 +5190,7 @@ class TaskInstanceModelView(AirflowModelView):
edit_form = TaskInstanceEditForm
- base_order = ("job_id", "asc")
+ base_order = ("queued_dttm", "asc")
base_filters = [["dag_id", DagFilter, list]]
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index f278eee7d05..8adffd106ea 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-9b9dcf915eff051a5cd77176a78bdcca3703b227373efe83fd0a1d4d05623c28
\ No newline at end of file
+1d781ee92cc59e7647d7f72ddc542b7f17e03fc8b822950db74415c38279d40f
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg
b/docs/apache-airflow/img/airflow_erd.svg
index 177c5a60f14..1b0d5b346c9 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1144,9 +1144,9 @@
<text text-anchor="start" x="1024" y="-3145.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
<text text-anchor="start" x="1029" y="-3145.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
<polygon fill="none" stroke="black" points="949,-3111 949,-3136 1257,-3136
1257,-3111 949,-3111"/>
-<text text-anchor="start" x="954" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00">job_id</text>
-<text text-anchor="start" x="995" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1000" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="954" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00">last_heartbeat_at</text>
+<text text-anchor="start" x="1074" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1079" y="-3120.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
<polygon fill="none" stroke="black" points="949,-3086 949,-3111 1257,-3111
1257,-3086 949,-3086"/>
<text text-anchor="start" x="954" y="-3095.8"
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
<text text-anchor="start" x="1030" y="-3095.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
@@ -1708,176 +1708,172 @@
<!-- task_instance_history -->
<g id="node39" class="node">
<title>task_instance_history</title>
-<polygon fill="none" stroke="black" points="1358,-2320 1358,-2348 1666,-2348
1666,-2320 1358,-2320"/>
-<text text-anchor="start" x="1414.5" y="-2331.2"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">task_instance_history</text>
-<polygon fill="none" stroke="black" points="1358,-2295 1358,-2320 1666,-2320
1666,-2295 1358,-2295"/>
-<text text-anchor="start" x="1363" y="-2304.8"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">id</text>
-<text text-anchor="start" x="1376" y="-2304.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1381" y="-2304.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1458" y="-2304.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2270 1358,-2295 1666,-2295
1666,-2270 1358,-2270"/>
-<text text-anchor="start" x="1363" y="-2279.8"
font-family="Helvetica,sans-Serif" font-size="14.00">custom_operator_name</text>
-<text text-anchor="start" x="1526" y="-2279.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1531" y="-2279.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2245 1358,-2270 1666,-2270
1666,-2245 1358,-2245"/>
-<text text-anchor="start" x="1363" y="-2254.8"
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
-<text text-anchor="start" x="1409" y="-2254.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1414" y="-2254.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1535" y="-2254.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2220 1358,-2245 1666,-2245
1666,-2220 1358,-2220"/>
-<text text-anchor="start" x="1363" y="-2229.8"
font-family="Helvetica,sans-Serif" font-size="14.00">duration</text>
-<text text-anchor="start" x="1422" y="-2229.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1427" y="-2229.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [DOUBLE_PRECISION]</text>
-<polygon fill="none" stroke="black" points="1358,-2195 1358,-2220 1666,-2220
1666,-2195 1358,-2195"/>
-<text text-anchor="start" x="1363" y="-2204.8"
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
-<text text-anchor="start" x="1427" y="-2204.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1432" y="-2204.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-2170 1358,-2195 1666,-2195
1666,-2170 1358,-2170"/>
-<text text-anchor="start" x="1363" y="-2179.8"
font-family="Helvetica,sans-Serif" font-size="14.00">executor</text>
-<text text-anchor="start" x="1424" y="-2179.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1429" y="-2179.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2145 1358,-2170 1666,-2170
1666,-2145 1358,-2145"/>
-<text text-anchor="start" x="1363" y="-2154.8"
font-family="Helvetica,sans-Serif" font-size="14.00">executor_config</text>
-<text text-anchor="start" x="1473" y="-2154.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1478" y="-2154.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
-<polygon fill="none" stroke="black" points="1358,-2120 1358,-2145 1666,-2145
1666,-2120 1358,-2120"/>
-<text text-anchor="start" x="1363" y="-2129.8"
font-family="Helvetica,sans-Serif" font-size="14.00">external_executor_id</text>
-<text text-anchor="start" x="1506" y="-2129.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1511" y="-2129.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1358,-2095 1358,-2120 1666,-2120
1666,-2095 1358,-2095"/>
-<text text-anchor="start" x="1363" y="-2104.8"
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
-<text text-anchor="start" x="1433" y="-2104.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-2104.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-2070 1358,-2095 1666,-2095
1666,-2070 1358,-2070"/>
-<text text-anchor="start" x="1363" y="-2079.8"
font-family="Helvetica,sans-Serif" font-size="14.00">job_id</text>
-<text text-anchor="start" x="1404" y="-2079.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1409" y="-2079.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-2045 1358,-2070 1666,-2070
1666,-2045 1358,-2045"/>
-<text text-anchor="start" x="1363" y="-2054.8"
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
-<text text-anchor="start" x="1439" y="-2054.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1444" y="-2054.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1521" y="-2054.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-2020 1358,-2045 1666,-2045
1666,-2020 1358,-2020"/>
-<text text-anchor="start" x="1363" y="-2029.8"
font-family="Helvetica,sans-Serif" font-size="14.00">max_tries</text>
-<text text-anchor="start" x="1431" y="-2029.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1436" y="-2029.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1995 1358,-2020 1666,-2020
1666,-1995 1358,-1995"/>
-<text text-anchor="start" x="1363" y="-2004.8"
font-family="Helvetica,sans-Serif" font-size="14.00">next_kwargs</text>
-<text text-anchor="start" x="1451" y="-2004.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1456" y="-2004.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
-<polygon fill="none" stroke="black" points="1358,-1970 1358,-1995 1666,-1995
1666,-1970 1358,-1970"/>
-<text text-anchor="start" x="1363" y="-1979.8"
font-family="Helvetica,sans-Serif" font-size="14.00">next_method</text>
-<text text-anchor="start" x="1454" y="-1979.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1459" y="-1979.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1945 1358,-1970 1666,-1970
1666,-1945 1358,-1945"/>
-<text text-anchor="start" x="1363" y="-1954.8"
font-family="Helvetica,sans-Serif" font-size="14.00">operator</text>
-<text text-anchor="start" x="1423" y="-1954.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1428" y="-1954.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1920 1358,-1945 1666,-1945
1666,-1920 1358,-1920"/>
-<text text-anchor="start" x="1363" y="-1929.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pid</text>
-<text text-anchor="start" x="1385" y="-1929.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1390" y="-1929.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1895 1358,-1920 1666,-1920
1666,-1895 1358,-1895"/>
-<text text-anchor="start" x="1363" y="-1904.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
-<text text-anchor="start" x="1393" y="-1904.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1398" y="-1904.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<text text-anchor="start" x="1519" y="-1904.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1870 1358,-1895 1666,-1895
1666,-1870 1358,-1870"/>
-<text text-anchor="start" x="1363" y="-1879.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pool_slots</text>
-<text text-anchor="start" x="1432" y="-1879.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1437" y="-1879.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1514" y="-1879.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1845 1358,-1870 1666,-1870
1666,-1845 1358,-1845"/>
-<text text-anchor="start" x="1363" y="-1854.8"
font-family="Helvetica,sans-Serif" font-size="14.00">priority_weight</text>
-<text text-anchor="start" x="1467" y="-1854.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1472" y="-1854.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1820 1358,-1845 1666,-1845
1666,-1820 1358,-1820"/>
-<text text-anchor="start" x="1363" y="-1829.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queue</text>
-<text text-anchor="start" x="1407" y="-1829.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1412" y="-1829.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
-<polygon fill="none" stroke="black" points="1358,-1795 1358,-1820 1666,-1820
1666,-1795 1358,-1795"/>
-<text text-anchor="start" x="1363" y="-1804.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queued_by_job_id</text>
-<text text-anchor="start" x="1487" y="-1804.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1492" y="-1804.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1770 1358,-1795 1666,-1795
1666,-1770 1358,-1770"/>
-<text text-anchor="start" x="1363" y="-1779.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queued_dttm</text>
-<text text-anchor="start" x="1456" y="-1779.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1461" y="-1779.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1745 1358,-1770 1666,-1770
1666,-1745 1358,-1745"/>
-<text text-anchor="start" x="1363" y="-1754.8"
font-family="Helvetica,sans-Serif" font-size="14.00">rendered_map_index</text>
-<text text-anchor="start" x="1508" y="-1754.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1513" y="-1754.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<polygon fill="none" stroke="black" points="1358,-1720 1358,-1745 1666,-1745
1666,-1720 1358,-1720"/>
-<text text-anchor="start" x="1363" y="-1729.8"
font-family="Helvetica,sans-Serif" font-size="14.00">run_id</text>
-<text text-anchor="start" x="1407" y="-1729.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1412" y="-1729.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1533" y="-1729.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1695 1358,-1720 1666,-1720
1666,-1695 1358,-1695"/>
-<text text-anchor="start" x="1363" y="-1704.8"
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
-<text text-anchor="start" x="1433" y="-1704.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-1704.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1670 1358,-1695 1666,-1695
1666,-1670 1358,-1670"/>
-<text text-anchor="start" x="1363" y="-1679.8"
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
-<text text-anchor="start" x="1398" y="-1679.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1403" y="-1679.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
-<polygon fill="none" stroke="black" points="1358,-1645 1358,-1670 1666,-1670
1666,-1645 1358,-1645"/>
-<text text-anchor="start" x="1363" y="-1654.8"
font-family="Helvetica,sans-Serif" font-size="14.00">task_display_name</text>
-<text text-anchor="start" x="1495" y="-1654.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1500" y="-1654.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1620 1358,-1645 1666,-1645
1666,-1620 1358,-1620"/>
-<text text-anchor="start" x="1363" y="-1629.8"
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
-<text text-anchor="start" x="1412" y="-1629.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1417" y="-1629.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
-<text text-anchor="start" x="1538" y="-1629.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1595 1358,-1620 1666,-1620
1666,-1595 1358,-1595"/>
-<text text-anchor="start" x="1363" y="-1604.8"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
-<text text-anchor="start" x="1430" y="-1604.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1435" y="-1604.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<polygon fill="none" stroke="black" points="1358,-1570 1358,-1595 1666,-1595
1666,-1570 1358,-1570"/>
-<text text-anchor="start" x="1363" y="-1579.8"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_timeout</text>
-<text text-anchor="start" x="1471" y="-1579.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1476" y="-1579.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
-<polygon fill="none" stroke="black" points="1358,-1545 1358,-1570 1666,-1570
1666,-1545 1358,-1545"/>
-<text text-anchor="start" x="1363" y="-1554.8"
font-family="Helvetica,sans-Serif" font-size="14.00">try_number</text>
-<text text-anchor="start" x="1445" y="-1554.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1450" y="-1554.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
-<text text-anchor="start" x="1527" y="-1554.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
-<polygon fill="none" stroke="black" points="1358,-1520 1358,-1545 1666,-1545
1666,-1520 1358,-1520"/>
-<text text-anchor="start" x="1363" y="-1529.8"
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
-<text text-anchor="start" x="1433" y="-1529.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1438" y="-1529.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
-<polygon fill="none" stroke="black" points="1358,-1495 1358,-1520 1666,-1520
1666,-1495 1358,-1495"/>
-<text text-anchor="start" x="1363" y="-1504.8"
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
-<text text-anchor="start" x="1442" y="-1504.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
-<text text-anchor="start" x="1447" y="-1504.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-2321 1358,-2349 1666,-2349
1666,-2321 1358,-2321"/>
+<text text-anchor="start" x="1414.5" y="-2332.2"
font-family="Helvetica,sans-Serif" font-weight="bold"
font-size="16.00">task_instance_history</text>
+<polygon fill="none" stroke="black" points="1358,-2296 1358,-2321 1666,-2321
1666,-2296 1358,-2296"/>
+<text text-anchor="start" x="1363" y="-2305.8"
font-family="Helvetica,sans-Serif" text-decoration="underline"
font-size="14.00">id</text>
+<text text-anchor="start" x="1376" y="-2305.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1381" y="-2305.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1458" y="-2305.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2271 1358,-2296 1666,-2296
1666,-2271 1358,-2271"/>
+<text text-anchor="start" x="1363" y="-2280.8"
font-family="Helvetica,sans-Serif" font-size="14.00">custom_operator_name</text>
+<text text-anchor="start" x="1526" y="-2280.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1531" y="-2280.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2246 1358,-2271 1666,-2271
1666,-2246 1358,-2246"/>
+<text text-anchor="start" x="1363" y="-2255.8"
font-family="Helvetica,sans-Serif" font-size="14.00">dag_id</text>
+<text text-anchor="start" x="1409" y="-2255.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1414" y="-2255.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1535" y="-2255.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2221 1358,-2246 1666,-2246
1666,-2221 1358,-2221"/>
+<text text-anchor="start" x="1363" y="-2230.8"
font-family="Helvetica,sans-Serif" font-size="14.00">duration</text>
+<text text-anchor="start" x="1422" y="-2230.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1427" y="-2230.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [DOUBLE_PRECISION]</text>
+<polygon fill="none" stroke="black" points="1358,-2196 1358,-2221 1666,-2221
1666,-2196 1358,-2196"/>
+<text text-anchor="start" x="1363" y="-2205.8"
font-family="Helvetica,sans-Serif" font-size="14.00">end_date</text>
+<text text-anchor="start" x="1427" y="-2205.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1432" y="-2205.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-2171 1358,-2196 1666,-2196
1666,-2171 1358,-2171"/>
+<text text-anchor="start" x="1363" y="-2180.8"
font-family="Helvetica,sans-Serif" font-size="14.00">executor</text>
+<text text-anchor="start" x="1424" y="-2180.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1429" y="-2180.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2146 1358,-2171 1666,-2171
1666,-2146 1358,-2146"/>
+<text text-anchor="start" x="1363" y="-2155.8"
font-family="Helvetica,sans-Serif" font-size="14.00">executor_config</text>
+<text text-anchor="start" x="1473" y="-2155.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1478" y="-2155.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [BYTEA]</text>
+<polygon fill="none" stroke="black" points="1358,-2121 1358,-2146 1666,-2146
1666,-2121 1358,-2121"/>
+<text text-anchor="start" x="1363" y="-2130.8"
font-family="Helvetica,sans-Serif" font-size="14.00">external_executor_id</text>
+<text text-anchor="start" x="1506" y="-2130.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1511" y="-2130.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1358,-2096 1358,-2121 1666,-2121
1666,-2096 1358,-2096"/>
+<text text-anchor="start" x="1363" y="-2105.8"
font-family="Helvetica,sans-Serif" font-size="14.00">hostname</text>
+<text text-anchor="start" x="1433" y="-2105.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-2105.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-2071 1358,-2096 1666,-2096
1666,-2071 1358,-2071"/>
+<text text-anchor="start" x="1363" y="-2080.8"
font-family="Helvetica,sans-Serif" font-size="14.00">map_index</text>
+<text text-anchor="start" x="1439" y="-2080.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1444" y="-2080.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1521" y="-2080.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-2046 1358,-2071 1666,-2071
1666,-2046 1358,-2046"/>
+<text text-anchor="start" x="1363" y="-2055.8"
font-family="Helvetica,sans-Serif" font-size="14.00">max_tries</text>
+<text text-anchor="start" x="1431" y="-2055.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1436" y="-2055.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-2021 1358,-2046 1666,-2046
1666,-2021 1358,-2021"/>
+<text text-anchor="start" x="1363" y="-2030.8"
font-family="Helvetica,sans-Serif" font-size="14.00">next_kwargs</text>
+<text text-anchor="start" x="1451" y="-2030.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1456" y="-2030.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text>
+<polygon fill="none" stroke="black" points="1358,-1996 1358,-2021 1666,-2021
1666,-1996 1358,-1996"/>
+<text text-anchor="start" x="1363" y="-2005.8"
font-family="Helvetica,sans-Serif" font-size="14.00">next_method</text>
+<text text-anchor="start" x="1454" y="-2005.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1459" y="-2005.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1971 1358,-1996 1666,-1996
1666,-1971 1358,-1971"/>
+<text text-anchor="start" x="1363" y="-1980.8"
font-family="Helvetica,sans-Serif" font-size="14.00">operator</text>
+<text text-anchor="start" x="1423" y="-1980.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1428" y="-1980.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1946 1358,-1971 1666,-1971
1666,-1946 1358,-1946"/>
+<text text-anchor="start" x="1363" y="-1955.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pid</text>
+<text text-anchor="start" x="1385" y="-1955.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1390" y="-1955.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1921 1358,-1946 1666,-1946
1666,-1921 1358,-1921"/>
+<text text-anchor="start" x="1363" y="-1930.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pool</text>
+<text text-anchor="start" x="1393" y="-1930.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1398" y="-1930.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
+<text text-anchor="start" x="1519" y="-1930.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1896 1358,-1921 1666,-1921
1666,-1896 1358,-1896"/>
+<text text-anchor="start" x="1363" y="-1905.8"
font-family="Helvetica,sans-Serif" font-size="14.00">pool_slots</text>
+<text text-anchor="start" x="1432" y="-1905.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1437" y="-1905.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1514" y="-1905.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1871 1358,-1896 1666,-1896
1666,-1871 1358,-1871"/>
+<text text-anchor="start" x="1363" y="-1880.8"
font-family="Helvetica,sans-Serif" font-size="14.00">priority_weight</text>
+<text text-anchor="start" x="1467" y="-1880.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1472" y="-1880.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1846 1358,-1871 1666,-1871
1666,-1846 1358,-1846"/>
+<text text-anchor="start" x="1363" y="-1855.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queue</text>
+<text text-anchor="start" x="1407" y="-1855.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1412" y="-1855.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(256)]</text>
+<polygon fill="none" stroke="black" points="1358,-1821 1358,-1846 1666,-1846
1666,-1821 1358,-1821"/>
+<text text-anchor="start" x="1363" y="-1830.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queued_by_job_id</text>
+<text text-anchor="start" x="1487" y="-1830.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1492" y="-1830.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1796 1358,-1821 1666,-1821
1666,-1796 1358,-1796"/>
+<text text-anchor="start" x="1363" y="-1805.8"
font-family="Helvetica,sans-Serif" font-size="14.00">queued_dttm</text>
+<text text-anchor="start" x="1456" y="-1805.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1461" y="-1805.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1771 1358,-1796 1666,-1796
1666,-1771 1358,-1771"/>
+<text text-anchor="start" x="1363" y="-1780.8"
font-family="Helvetica,sans-Serif" font-size="14.00">rendered_map_index</text>
+<text text-anchor="start" x="1508" y="-1780.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1513" y="-1780.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<polygon fill="none" stroke="black" points="1358,-1746 1358,-1771 1666,-1771
1666,-1746 1358,-1746"/>
+<text text-anchor="start" x="1363" y="-1755.8"
font-family="Helvetica,sans-Serif" font-size="14.00">run_id</text>
+<text text-anchor="start" x="1407" y="-1755.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1412" y="-1755.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1533" y="-1755.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1721 1358,-1746 1666,-1746
1666,-1721 1358,-1721"/>
+<text text-anchor="start" x="1363" y="-1730.8"
font-family="Helvetica,sans-Serif" font-size="14.00">start_date</text>
+<text text-anchor="start" x="1433" y="-1730.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-1730.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1696 1358,-1721 1666,-1721
1666,-1696 1358,-1696"/>
+<text text-anchor="start" x="1363" y="-1705.8"
font-family="Helvetica,sans-Serif" font-size="14.00">state</text>
+<text text-anchor="start" x="1398" y="-1705.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1403" y="-1705.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(20)]</text>
+<polygon fill="none" stroke="black" points="1358,-1671 1358,-1696 1666,-1696
1666,-1671 1358,-1671"/>
+<text text-anchor="start" x="1363" y="-1680.8"
font-family="Helvetica,sans-Serif" font-size="14.00">task_display_name</text>
+<text text-anchor="start" x="1495" y="-1680.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1500" y="-1680.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(2000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1646 1358,-1671 1666,-1671
1666,-1646 1358,-1646"/>
+<text text-anchor="start" x="1363" y="-1655.8"
font-family="Helvetica,sans-Serif" font-size="14.00">task_id</text>
+<text text-anchor="start" x="1412" y="-1655.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1417" y="-1655.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(250)]</text>
+<text text-anchor="start" x="1538" y="-1655.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1621 1358,-1646 1666,-1646
1666,-1621 1358,-1621"/>
+<text text-anchor="start" x="1363" y="-1630.8"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_id</text>
+<text text-anchor="start" x="1430" y="-1630.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1435" y="-1630.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<polygon fill="none" stroke="black" points="1358,-1596 1358,-1621 1666,-1621
1666,-1596 1358,-1596"/>
+<text text-anchor="start" x="1363" y="-1605.8"
font-family="Helvetica,sans-Serif" font-size="14.00">trigger_timeout</text>
+<text text-anchor="start" x="1471" y="-1605.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1476" y="-1605.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
+<polygon fill="none" stroke="black" points="1358,-1571 1358,-1596 1666,-1596
1666,-1571 1358,-1571"/>
+<text text-anchor="start" x="1363" y="-1580.8"
font-family="Helvetica,sans-Serif" font-size="14.00">try_number</text>
+<text text-anchor="start" x="1445" y="-1580.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1450" y="-1580.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [INTEGER]</text>
+<text text-anchor="start" x="1527" y="-1580.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text>
+<polygon fill="none" stroke="black" points="1358,-1546 1358,-1571 1666,-1571
1666,-1546 1358,-1546"/>
+<text text-anchor="start" x="1363" y="-1555.8"
font-family="Helvetica,sans-Serif" font-size="14.00">unixname</text>
+<text text-anchor="start" x="1433" y="-1555.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1438" y="-1555.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [VARCHAR(1000)]</text>
+<polygon fill="none" stroke="black" points="1358,-1521 1358,-1546 1666,-1546
1666,-1521 1358,-1521"/>
+<text text-anchor="start" x="1363" y="-1530.8"
font-family="Helvetica,sans-Serif" font-size="14.00">updated_at</text>
+<text text-anchor="start" x="1442" y="-1530.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> </text>
+<text text-anchor="start" x="1447" y="-1530.8"
font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text>
</g>
<!-- task_instance--task_instance_history -->
<g id="edge50" class="edge">
<title>task_instance--task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1263.15,-2532.36C1267.82,-2520.44 1272.45,-2508.63 1277,-2497
1300.65,-2436.57 1325.57,-2372.69 1349.97,-2310.33"/>
-<text text-anchor="start" x="1318.97" y="-2299.13" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1253.15" y="-2521.16" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265,-2536.49C1292.94,-2465.4 1321.99,-2392.03 1349.92,-2322"/>
+<text text-anchor="start" x="1318.92" y="-2310.8" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2525.29" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--task_instance_history -->
<g id="edge51" class="edge">
<title>task_instance--task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265.1,-2545.38C1269.11,-2535.15 1273.08,-2525.01 1277,-2515
1300.65,-2454.57 1325.57,-2390.69 1349.97,-2328.12"/>
-<text text-anchor="start" x="1318.97" y="-2316.92" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2534.18" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265,-2549.72C1292.94,-2478.99 1321.99,-2405.61 1349.92,-2335.23"/>
+<text text-anchor="start" x="1318.92" y="-2339.03" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2553.52" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--task_instance_history -->
<g id="edge52" class="edge">
<title>task_instance--task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265.1,-2563.38C1269.11,-2553.15 1273.08,-2543.01 1277,-2533
1300.65,-2472.57 1325.57,-2408.69 1349.97,-2345.91"/>
-<text text-anchor="start" x="1318.97" y="-2334.71" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2552.18" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265,-2562.96C1292.94,-2492.58 1321.99,-2419.2 1349.92,-2348.47"/>
+<text text-anchor="start" x="1318.92" y="-2352.27" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2566.76" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--task_instance_history -->
<g id="edge53" class="edge">
<title>task_instance--task_instance_history</title>
-<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265.1,-2581.38C1269.11,-2571.15 1273.08,-2561.01 1277,-2551
1302.03,-2487.04 1328.48,-2419.23 1354.24,-2352.7"/>
-<text text-anchor="start" x="1323.24" y="-2356.5" font-family="Times,serif"
font-size="14.00">0..N</text>
-<text text-anchor="start" x="1265.1" y="-2570.18" font-family="Times,serif"
font-size="14.00">1</text>
+<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1265,-2576.19C1294.05,-2503.38 1324.3,-2426.95 1353.25,-2353.24"/>
+<text text-anchor="start" x="1353.25" y="-2357.04" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1265" y="-2579.99" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- backfill -->
<g id="node33" class="node">
diff --git a/docs/apache-airflow/migrations-ref.rst
b/docs/apache-airflow/migrations-ref.rst
index f3441ceaf72..f133a67e08e 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``5f57a45b8433`` (head) | ``486ac7936b78`` | ``3.0.0`` | Drop
task_fail table. |
+| ``d8cd3297971e`` (head) | ``5f57a45b8433`` | ``3.0.0`` | Add
last_heartbeat_at directly to TI. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``5f57a45b8433`` | ``486ac7936b78`` | ``3.0.0`` | Drop
task_fail table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``486ac7936b78`` | ``d59cbbef95eb`` | ``3.0.0`` | remove
scheduler_lock column. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
index 1dfc51a0a04..16c1df48a9e 100755
--- a/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
+++ b/scripts/ci/pre_commit/check_ti_vs_tis_attributes.py
@@ -52,6 +52,8 @@ def compare_attributes(path1, path2):
"triggerer_job",
"note",
"rendered_task_instance_fields",
+ # Storing last heartbeat for historic TIs is not interesting/useful
+ "last_heartbeat_at",
} # exclude attrs not necessary to be in TaskInstanceHistory
if not diff:
return
diff --git
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 237ef5910c7..68ecd1e8389 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -77,7 +77,6 @@ class TestMappedTaskInstanceEndpoint:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
self.app = configured_app
self.client = self.app.test_client() # type:ignore
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index bc8836981d4..e1fa6d13b74 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -81,7 +81,6 @@ class TestTaskInstanceEndpoint:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
self.app = configured_app
self.client = self.app.test_client() # type:ignore
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index fa9cc0b161d..717f17ca278 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -68,7 +68,6 @@ class TestTaskInstanceEndpoint:
"duration": 10000,
"pool": "default_pool",
"queue": "default_queue",
- "job_id": 0,
}
clear_db_runs()
clear_rendered_ti_fields()
diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py
index eb12f281606..3310502a97b 100644
--- a/tests/assets/test_manager.py
+++ b/tests/assets/test_manager.py
@@ -59,6 +59,7 @@ def clear_assets():
@pytest.fixture
def mock_task_instance():
return TaskInstancePydantic(
+ id="1",
task_id="5",
dag_id="7",
run_id="11",
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index 9b605e818d8..5a4e0b27924 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -932,7 +932,7 @@ class TestLogsfromTaskRunCommand:
print(logs) # In case of a test failures this line would show
detailed log
logs_list = logs.splitlines()
- assert f"Subtask {self.task_id}" in logs
+ assert f"Task {self.task_id}" in logs
assert "standard_task_runner.py" in logs
self.assert_log_line("Log from DAG Logger", logs_list)
self.assert_log_line("Log from TI Logger", logs_list)
diff --git a/tests/executors/test_debug_executor.py
b/tests/executors/test_debug_executor.py
index 20ee821842c..a8ad6679576 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -50,7 +50,7 @@ class TestDebugExecutor:
succeeded = executor._run_task(task_instance_mock)
assert succeeded
- task_instance_mock.run.assert_called_once_with(job_id=job_id)
+ task_instance_mock.run.assert_called()
def test_queue_task_instance(self):
key = "ti_key"
diff --git a/tests/jobs/test_local_task_job.py
b/tests/jobs/test_local_task_job.py
index 84a7465a823..7ee03747883 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -131,17 +131,21 @@ class TestLocalTaskJob:
assert all(check_result_2)
@pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
- def test_localtaskjob_heartbeat(self, dag_maker):
+ def test_localtaskjob_heartbeat(self, dag_maker, time_machine):
session = settings.Session()
with dag_maker("test_localtaskjob_heartbeat"):
op1 = EmptyOperator(task_id="op1")
+ time_machine.move_to(DEFAULT_DATE, tick=False)
+
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.RUNNING
ti.hostname = "blablabla"
session.commit()
+ assert ti.last_heartbeat_at is None, "Pre-conditioncheck"
+
job1 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
job_runner = LocalTaskJobRunner(job=job1, task_instance=ti,
ignore_ti_state=True)
ti.task = op1
@@ -149,9 +153,12 @@ class TestLocalTaskJob:
job1.task_runner = StandardTaskRunner(job_runner)
job1.task_runner.process = mock.Mock()
job_runner.task_runner = job1.task_runner
- with pytest.raises(AirflowException):
+ with pytest.raises(AirflowException, match="Hostname .* does not
match"):
job_runner.heartbeat_callback()
+ ti = session.get(TaskInstance, (ti.id,))
+ assert ti.last_heartbeat_at is None, "Should still be none"
+
job1.task_runner.process.pid = 1
ti.state = State.RUNNING
ti.hostname = get_hostname()
@@ -164,19 +171,22 @@ class TestLocalTaskJob:
job_runner.heartbeat_callback(session=None)
job1.task_runner.process.pid = 2
- with pytest.raises(AirflowException):
+ with pytest.raises(AirflowException, match="PID .* does not match"):
job_runner.heartbeat_callback()
# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
- session.merge(ti)
+ ti = session.merge(ti)
session.commit()
assert ti.pid != job1.task_runner.process.pid
assert not ti.run_as_user
assert not job1.task_runner.run_as_user
job_runner.heartbeat_callback()
+ ti = session.get(TaskInstance, (ti.id,))
+ assert ti.last_heartbeat_at == DEFAULT_DATE
+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
@mock.patch("subprocess.check_call")
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 311de0ce2b6..da3ccc201eb 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -50,7 +50,6 @@ from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job, run_job
-from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.asset import AssetActive, AssetDagRunQueue, AssetEvent,
AssetModel
from airflow.models.backfill import Backfill, _create_backfill
@@ -68,7 +67,7 @@ from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
-from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
+from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
@@ -5665,16 +5664,10 @@ class TestSchedulerJob:
for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=dag_run.run_id,
state=State.RUNNING)
- ti.queued_by_job_id = 999
-
- local_job = Job(dag_id=ti.dag_id)
- LocalTaskJobRunner(job=local_job, task_instance=ti)
- local_job.state = TaskInstanceState.FAILED
- session.add(local_job)
- session.flush()
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
+ ti.queued_by_job_id = 999
- ti.job_id = local_job.id
session.add(ti)
session.flush()
@@ -5733,13 +5726,6 @@ class TestSchedulerJob:
ti = TaskInstance(task, run_id=dag_run.run_id,
state=State.RUNNING)
ti.queued_by_job_id = 999
- local_job = Job(dag_id=ti.dag_id)
- local_job.state = TaskInstanceState.FAILED
-
- session.add(local_job)
- session.flush()
-
- ti.job_id = local_job.id
session.add(ti)
session.flush()
@@ -5795,17 +5781,11 @@ class TestSchedulerJob:
task = dag.get_task(task_id="run_this_last")
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-
- local_job = Job(dag_id=ti.dag_id)
- LocalTaskJobRunner(job=local_job, task_instance=ti)
- local_job.state = JobState.FAILED
- session.add(local_job)
- session.flush()
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
# TODO: If there was an actual Relationship between TI and Job
# we wouldn't need this extra commit
session.add(ti)
- ti.job_id = local_job.id
session.flush()
scheduler_job = Job()
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index ccd19ad3272..8a1df0594e4 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3993,7 +3993,6 @@ class TestTaskInstance:
"hostname": "some_unique_hostname",
"id": str(uuid6.uuid7()),
"unixname": "some_unique_unixname",
- "job_id": 1234,
"pool": "some_fake_pool_id",
"pool_slots": 25,
"queue": "some_queue_id",
@@ -4004,6 +4003,7 @@ class TestTaskInstance:
"rendered_map_index": None,
"queued_by_job_id": 321,
"pid": 123,
+ "last_heartbeat_at": run_date + datetime.timedelta(hours=1,
seconds=4),
"executor": "some_executor",
"executor_config": {"Some": {"extra": "information"}},
"external_executor_id": "some_executor_id",
diff --git a/tests/www/views/test_views_tasks.py
b/tests/www/views/test_views_tasks.py
index fabd104e8c2..19caafe55bc 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -1109,7 +1109,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1145,7 +1145,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1181,7 +1181,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1217,7 +1217,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1253,7 +1253,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1289,7 +1289,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,
@@ -1325,7 +1325,7 @@ def test_task_instances(admin_client):
"external_executor_id": None,
"hostname": "",
"id": unittest.mock.ANY, # Ignore the `id` field
- "job_id": None,
+ "last_heartbeat_at": None,
"map_index": -1,
"max_tries": 0,
"next_kwargs": None,