Repository: incubator-airflow Updated Branches: refs/heads/master b532d8d77 -> 4f20f6077
[AIRFLOW-1366] Add max_tries to task instance Right now Airflow deletes the task instance when user clear it. We have no way of keeping track of how many times a task instance gets run either via user or itself. So instead of deleting the task instance record, we should keep the task instance and make try_number monotonically increasing for every task instance attempt. max_tries is introduced as an upper bound for retrying tasks by task itself. This new column will be used to update logic behind clear_task_instances. db migration is tested locally. Closes #2409 from AllisonWang/allison--max-tries Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f20f607 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f20f607 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f20f607 Branch: refs/heads/master Commit: 4f20f607764bb3477419321b5dfd0c53ba1db3c0 Parents: b532d8d Author: AllisonWang <[email protected]> Authored: Mon Jul 10 15:26:08 2017 -0700 Committer: Dan Davydov <[email protected]> Committed: Mon Jul 10 15:26:12 2017 -0700 ---------------------------------------------------------------------- ...dc7_add_max_tries_column_to_task_instance.py | 106 +++++++++++++++++++ airflow/models.py | 3 + 2 files changed, 109 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py new file mode 100644 index 0000000..2d5ffc2 --- /dev/null +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -0,0 +1,106 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add max tries column to task instance + +Revision ID: cc1e65623dc7 +Revises: 127d2bf2dfa7 +Create Date: 2017-06-19 16:53:12.851141 + +""" + +# revision identifiers, used by Alembic. +revision = 'cc1e65623dc7' +down_revision = '127d2bf2dfa7' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from airflow import settings +from airflow.models import DagBag, TaskInstance + +BATCH_SIZE = 5000 + +def upgrade(): + op.add_column('task_instance', sa.Column('max_tries', sa.Integer, + server_default="-1")) + # Check if table task_instance exist before data migration. This check is + # needed for database that does not create table until migration finishes. + # Checking task_instance table exists prevent the error of querying + # non-existing task_instance table. + engine = settings.engine + if engine.dialect.has_table(engine, 'task_instance'): + # Get current session + connection = op.get_bind() + sessionmaker = sa.orm.sessionmaker() + session = sessionmaker(bind=connection) + dagbag = DagBag(settings.DAGS_FOLDER) + query = session.query(sa.func.count(TaskInstance.max_tries)).filter( + TaskInstance.max_tries == -1 + ) + # Separate db query in batch to prevent loading entire table + # into memory and cause out of memory error. + while query.scalar(): + tis = session.query(TaskInstance).filter( + TaskInstance.max_tries == -1 + ).limit(BATCH_SIZE).all() + for ti in tis: + dag = dagbag.get_dag(ti.dag_id) + if not dag or not dag.has_task(ti.task_id): + # task_instance table might not have the up-to-date + # information, i.e dag or task might be modified or + # deleted in dagbag but is reflected in task instance + # table. In this case we do not retry the task that can't + # be parsed. + ti.max_tries = ti.try_number + else: + task = dag.get_task(ti.task_id) + ti.max_tries = task.retries + session.merge(ti) + session.commit() + # Commit the current session. + session.commit() + + +def downgrade(): + engine = settings.engine + if engine.dialect.has_table(engine, 'task_instance'): + connection = op.get_bind() + sessionmaker = sa.orm.sessionmaker() + session = sessionmaker(bind=connection) + dagbag = DagBag(settings.DAGS_FOLDER) + query = session.query(sa.func.count(TaskInstance.max_tries)).filter( + TaskInstance.max_tries != -1 + ) + while query.scalar(): + tis = session.query(TaskInstance).filter( + TaskInstance.max_tries != -1 + ).limit(BATCH_SIZE).all() + for ti in tis: + dag = dagbag.get_dag(ti.dag_id) + if not dag or not dag.has_task(ti.task_id): + ti.try_number = 0 + else: + task = dag.get_task(ti.task_id) + # max_tries - try_number is number of times a task instance + # left to retry by itself. So the current try_number should be + # max number of self retry (task.retries) minus number of + # times left for task instance to try the task. + ti.try_number = max(0, task.retries - (ti.max_tries - + ti.try_number)) + ti.max_tries = -1 + session.merge(ti) + session.commit() + session.commit() + op.drop_column('task_instance', 'max_tries') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f20f607/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 8566b7f..32ad144 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -754,6 +754,7 @@ class TaskInstance(Base): duration = Column(Float) state = Column(String(20)) try_number = Column(Integer, default=0) + max_tries = Column(Integer) hostname = Column(String(1000)) unixname = Column(String(1000)) job_id = Column(Integer) @@ -780,6 +781,7 @@ class TaskInstance(Base): self.pool = task.pool self.priority_weight = task.priority_weight_total self.try_number = 0 + self.max_tries = self.task.retries self.unixname = getpass.getuser() self.run_as_user = task.run_as_user if state: @@ -1021,6 +1023,7 @@ class TaskInstance(Base): self.start_date = ti.start_date self.end_date = ti.end_date self.try_number = ti.try_number + self.max_tries = ti.max_tries self.hostname = ti.hostname self.pid = ti.pid else:
