[
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989735#comment-16989735
]
Bjorn Olsen commented on AIRFLOW-6190:
--------------------------------------
I changed the func to include a random duration sleep and this triggers the
error more consistently.
This again suggests a timing issue between the scheduler and worker.
{code:java}
import random, time
def func():
time.sleep( random.randint(0,10) )
{code}
> Task instances queued and dequeued before worker is ready, causing
> intermittently failed tasks
> ----------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
> Issue Type: Bug
> Components: core
> Affects Versions: 1.10.6
> Reporter: Bjorn Olsen
> Assignee: Bjorn Olsen
> Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to
> randomly affect an entire set of dag runs or not affect the set at all. This
> makes me suspect a timing issue between the executor and scheduler (sometimes
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
> PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
> pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
>
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>
> The second attempt tasks have 2 Logs shown on the UI if they were successful,
> and 2 physical log files on disk. However the tasks that Failed only have 1
> log shown on the UI, despite there being 2 physical log files on disk.
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of
> the second log for the failed tasks).
>
> Anyway I am more interested in the intermittent failures than what logs are
> shown on the UI.
> Here is an example of the second log file for the Failed task attempts:
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met
> for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00
> [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the
> 'scheduled' state which is not a valid state for execution. The task must be
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met
> for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00
> [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed'
> state which is not a valid state for execution. The task must be cleared in
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>
> At first I thought this was because the workers were still busy with the
> previous TaskInstance (because there is a delay between when a TaskInstance
> state is set to SUCCESS, and when the worker is actually done with it,
> because of the worker heartbeat). The scheduler thinks the next task can be
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and
> therefore it goes back to QUEUED -> SCHEDULED. The task is still in the
> worker queue, causing the failure above when the worker eventually wants to
> start it.
> However what is a mystery to me is why it works the first time the dag_run
> runs, and not the second time. Perhaps it is something specific to my
> environment.
> I'm going to try and debug this myself but if anyone else can replicate this
> issue in their environment it could help me understand if it is just
> affecting me (or not).
> Just install the DAG, let it run 100% once, then clear it and let it run
> again (and you should start seeing random failures)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)