Kevin Gao created AIRFLOW-648:
---------------------------------

             Summary: Using the LatestOnlyOperator results in tasks being 
reenqueued many times over
                 Key: AIRFLOW-648
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-648
             Project: Apache Airflow
          Issue Type: Bug
          Components: operators, scheduler
    Affects Versions: Airflow 1.8
         Environment: Linux 3.2.0-109-virtual #150-Ubuntu SMP x86_64
Python Version: 2.7.3
Airflow Version: 1.7.1.3 with plugin to bring in LatestOnlyOperator
CeleryExecutor with Redis as a backend
Airflow Config (subset):
{code}
[core]
executor = CeleryExecutor
parallelism = 32
dag_concurrency = 16
dags_are_paused_at_creation = False
max_active_runs_per_dag = 16

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
{code}
            Reporter: Kevin Gao


We ported over a number of our cronjobs to run using Airflow. To achieve the 
desired behavior, we use the {{LatestOnlyOperator}} that was merged to master 
by pull request 1752: https://github.com/apache/incubator-airflow/pull/1752.

When we moved over our cronjobs, we migrated many at a time (using ansible). 
These DAGs had a start date going back a few days.

The first thing I noticed is that it seemed to take a long time to process the 
backfilled DAGs. They were being processed correctly, in the sense that the 
{{'latest_only'}} operator was being completely successfully, and the 
downstream {{BashOperator}} was marked as skipped. It also appeared that the 
DAG run completed successfully in the tree view. However, when I searched the 
DAG runs for {{state contains running}}, I saw that they were present.

One thing I noticed was that in the logs for one of the "stuck" DAG runs, it 
appeared that the {{'latest_only'}} task was processed multiple times.
{code}
[2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:28:58,400] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing 
<Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
[2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with 
left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 
2016-11-22 12:29:00.670321
[2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, 
skipping downstream.
[2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
[2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
[2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
[2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: 
my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 
2016-11-22 12:29:01.397870
[2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from 
/DAGDIR/my_dag.py
{code}

We use CeleryExecutor backed by Redis. When inspecting the Redis key itself 
that held the queue, I verified that duplicate tasks were in there; there were 
thousnands of tasks in the queue. Here are the tasks that were at the head of 
the task list (the dag names are changed for readability):
{code}
[
 u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd 
DAGS_FOLDER/DAG_01.py ']",
 u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd 
DAGS_FOLDER/DAG_02.py ']",
 u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd 
DAGS_FOLDER/DAG_03.py ']",
 u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd 
DAGS_FOLDER/DAG_04.py ']",
 u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd 
DAGS_FOLDER/DAG_05.py ']",
 u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd 
DAGS_FOLDER/DAG_06.py ']",
 u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd 
DAGS_FOLDER/DAG_07.py ']",
 u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd 
DAGS_FOLDER/DAG_01.py ']",
 u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd 
DAGS_FOLDER/DAG_02.py ']",
 u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd 
DAGS_FOLDER/DAG_03.py ']",
 u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd 
DAGS_FOLDER/DAG_04.py ']",
 u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd 
DAGS_FOLDER/DAG_05.py ']",
 u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd 
DAGS_FOLDER/DAG_06.py ']",
 u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd 
DAGS_FOLDER/DAG_07.py ']"
]
{code}

Grepping the scheduler's logs, here is one of the instances I see of the 
scheduler enqueuing a duplicate task:
{code}
$ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
[2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun 
DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally 
triggered: False>
[2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks 
instances, minus 0 skippable ones
[2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow 
run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks 
instances, minus 0 skippable ones
--
[2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun 
DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally 
triggered: False>
[2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks 
instances, minus 0 skippable ones
--
[2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun 
DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally 
triggered: False>
[2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
--
[2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun 
DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally 
triggered: False>
[2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks 
instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun 
DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally 
triggered: False>
[2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks 
instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow 
run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip 
for active runs.
[2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks 
instances, minus 0 skippable ones
{code}

Eventually, we ended up just creating new DAG definitions with future start 
dates and manually cleared the Redis queue.

**Additional Context**:

Our scheduler is daemonized by upstart and runs with {{-n 5}}

Here is the template we use for our cron DAGs (note it's a jinja2 template):
{code}
# {{ansible_managed}}

from dateutil import parser

from airflow.operators import LatestOnlyOperator
from airflow.operators import BashOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': parser.parse('{{item.start_date}}'),
    'retries': 0,
}

dag = DAG(
  dag_id='{{item.name}}',
  default_args=args,
  schedule_interval='{{item.schedule}}',
  max_active_runs=1,
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

script = BashOperator(
  task_id='{{item.name}}',
  bash_command='{{item.command}}',
  default_args=args,
  dag=dag,
)

script.set_upstream(latest_only)
{code}

One thing to note: We are on Airflow 2.7.1.3; however, we brought in the 
operator through a plugin:
{code}
import datetime
import logging

from airflow.models import BaseOperator, TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State
from airflow import settings


class LatestOnlyOperator(BaseOperator):
    """
    Allows a workflow to skip tasks that are not running during the most
    recent schedule interval.
    If the task is run outside of the latest schedule interval, all
    directly downstream tasks will be skipped.
    """

    ui_color = '#e9ffdb'  # nyanza

    def execute(self, context):
        now = datetime.datetime.now()
        left_window = context['dag'].following_schedule(
            context['execution_date'])
        right_window = context['dag'].following_schedule(left_window)
        logging.info(
            'Checking latest only with left_window: %s right_window: %s '
            'now: %s', left_window, right_window, now)
        if not left_window < now <= right_window:
            logging.info('Not latest execution, skipping downstream.')
            session = settings.Session()
            for task in context['task'].downstream_list:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                logging.info('Skipping task: %s', ti.task_id)
                ti.state = State.SKIPPED
                ti.start_date = now
                ti.end_date = now
                session.merge(ti)
            session.commit()
            session.close()
            logging.info('Done.')
        else:
            logging.info('Latest, allowing execution to proceed.')


class AirflowNextPlugin(AirflowPlugin):
    name = "airflow_next"
    operators = [LatestOnlyOperator]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to