[ https://issues.apache.org/jira/browse/AIRFLOW-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986012#comment-16986012 ]
ASF subversion and git services commented on AIRFLOW-6141: ---------------------------------------------------------- Commit 6b02547cb1800d1e9a06fde47f49862297a8cca1 in airflow's branch refs/heads/master from Kamil Breguła [ https://gitbox.apache.org/repos/asf?p=airflow.git;h=6b02547 ] [AIRFLOW-6141] Remove ReadyToRescheduleDep if sensor mode == poke (#6704) > The sensor cannot be executed by invalid deps > --------------------------------------------- > > Key: AIRFLOW-6141 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6141 > Project: Apache Airflow > Issue Type: Bug > Components: backfill > Affects Versions: 1.10.6 > Reporter: Kamil Bregula > Priority: Major > Fix For: 1.10.7 > > > Hello, > I have following DAG: > {code:python} > """Example DAG demonstrating the usage of the BranchPythonOperator.""" > import random > from os import path > import airflow > from airflow.contrib.sensors.python_sensor import PythonSensor > from airflow.models import DAG > from airflow.operators.dummy_operator import DummyOperator > from airflow.operators.python_operator import BranchPythonOperator > dag = DAG( > dag_id='demo_deadlock_backfill_issues', > default_args={ > 'owner': 'Airflow', > 'start_date': airflow.utils.dates.days_ago(2), > }, > schedule_interval="@daily", > ) > DEADLOCK_SHARED_SHARE = "/tmp/deadlock-shared-state" > def python_callable(): > return False > run = PythonSensor( > task_id='run', > dag=dag, > python_callable=python_callable, > ){code} > > I executed following command > {code:bash} > AIRFLOW__CORE__LOGGING_LEVEL=debug airflow dags backfill > demo_deadlock_backfill_issues -s 2019-01-02 -e 2019-01-02 --reset_dagruns > {code} > I then got the following log: > {code:java} > /opt/airflow/airflow/models/dagbag.py:21: DeprecationWarning: the imp module > is deprecated in favour of importlib; see the module's documentation for > alternative uses import imp [2019-12-01 19:03:40,197] {settings.py:139} DEBUG > - Setting up DB connection pool (PID 2429) [2019-12-01 19:03:40,198] > {settings.py:180} INFO - settings.configure_orm(): Using pool settings. > pool_size=5, max_overflow=10, pool_recycle=1800, pid=2429 [2019-12-01 > 19:03:40,269] {cli_action_loggers.py:41} DEBUG - Adding <function > default_action_log at 0x7fb41b4eff28> to pre execution callback [2019-12-01 > 19:03:40,344] {cli_action_loggers.py:67} DEBUG - Calling callbacks: > [<function default_action_log at 0x7fb41b4eff28>] [2019-12-01 19:03:40,361] > {__init__.py:55} INFO - Using executor SequentialExecutor [2019-12-01 > 19:03:40,362] {dagbag.py:401} INFO - Filling up the DagBag from > /root/airflow/dags [..] You are about to delete these 1 tasks: <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [running]>Are you > sure? (yes/no): yes [2019-12-01 19:03:45,075] {backfill_job.py:396} DEBUG - > *** Clearing out not_ready list *** [2019-12-01 19:03:45,085] > {backfill_job.py:414} DEBUG - Task instance to run <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> state > shutdown [2019-12-01 19:03:45,091] {taskinstance.py:631} DEBUG - > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Task Instance Not Running' PASSED: True, Task is not > in running state. [2019-12-01 19:03:45,091] {taskinstance.py:631} DEBUG - > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not > have any upstream tasks. [2019-12-01 19:03:45,091] {taskinstance.py:631} > DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 > 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. [2019-12-01 19:03:45,092] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Ready To Reschedule' PASSED: True, The task instance is not in > State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:45,092] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Not In Retry Period' PASSED: True, The task instance was not > marked for retrying. [2019-12-01 19:03:45,095] {taskinstance.py:631} DEBUG - > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Task Instance State' PASSED: False, Task is in the > 'shutdown' state which is not a valid state for execution. The task must be > cleared in order to be run. [2019-12-01 19:03:45,095] {taskinstance.py:608} > DEBUG - Dependencies not met for <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>, > dependency 'Task Instance State' FAILED: Task is in the 'shutdown' state > which is not a valid state for execution. The task must be cleared in order > to be run. [2019-12-01 19:03:45,095] {backfill_job.py:536} DEBUG - Adding > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> to not_ready [2019-12-01 19:03:50,064] {base_job.py:196} DEBUG - > [heartbeat] [2019-12-01 19:03:50,065] {base_executor.py:121} DEBUG - 0 > running task instances [2019-12-01 19:03:50,070] {base_executor.py:122} DEBUG > - 0 in queue [2019-12-01 19:03:50,071] {base_executor.py:123} DEBUG - 32 open > slots [2019-12-01 19:03:50,072] {base_executor.py:132} DEBUG - Calling the > <class 'airflow.executors.sequential_executor.SequentialExecutor'> sync > method [2019-12-01 19:03:50,075] {backfill_job.py:598} WARNING - Deadlock > discovered for ti_status.to_run=odict_values([<TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>]) > [2019-12-01 19:03:50,081] {dagrun.py:272} DEBUG - Updating state for <DagRun > demo_deadlock_backfill_issues @ 2019-01-02 00:00:00+00:00: > backfill_2019-01-02T00:00:00+00:00, externally triggered: False> considering > 1 task(s) [2019-12-01 19:03:50,085] {taskinstance.py:631} DEBUG - > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not > have any upstream tasks. [2019-12-01 19:03:50,086] {taskinstance.py:631} > DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 > 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. [2019-12-01 19:03:50,086] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Ready To Reschedule' PASSED: True, The context specified that > being in a reschedule period was permitted. [2019-12-01 19:03:50,086] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Not In Retry Period' PASSED: True, The context specified that > being in a retry period was permitted. [2019-12-01 19:03:50,086] > {taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > [2019-12-01 19:03:50,090] {backfill_job.py:359} INFO - [backfill progress] | > finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: > 0 | skipped: 0 | deadlocked: 1 | not ready: 1 [2019-12-01 19:03:50,090] > {backfill_job.py:364} DEBUG - Finished dag run loop iteration. Remaining > tasks odict_values([]) [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG > - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not > have any upstream tasks. [2019-12-01 19:03:50,091] {taskinstance.py:631} > DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 > 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. [2019-12-01 19:03:50,091] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Ready To Reschedule' PASSED: True, The task instance is not in > State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,091] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Not In Retry Period' PASSED: True, The task instance was not > marked for retrying. [2019-12-01 19:03:50,092] {taskinstance.py:614} DEBUG - > Dependencies all met for <TaskInstance: demo_deadlock_backfill_issues.run > 2019-01-02 00:00:00+00:00 [shutdown]> [2019-12-01 19:03:50,092] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Trigger Rule' PASSED: True, The task instance did not have any > upstream tasks. [2019-12-01 19:03:50,093] {taskinstance.py:631} DEBUG - > <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 > [shutdown]> dependency 'Previous Dagrun State' PASSED: True, The context > specified that the state of past DAGs could be ignored. [2019-12-01 > 19:03:50,093] {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Ready To Reschedule' PASSED: True, The task instance is not in > State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,094] > {taskinstance.py:631} DEBUG - <TaskInstance: > demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> > dependency 'Not In Retry Period' PASSED: True, The task instance was not > marked for retrying. [2019-12-01 19:03:50,095] {taskinstance.py:614} DEBUG - > Dependencies all met for <TaskInstance: demo_deadlock_backfill_issues.run > 2019-01-02 00:00:00+00:00 [shutdown]> [2019-12-01 19:03:50,097] > {base_executor.py:121} DEBUG - 0 running task instances [2019-12-01 > 19:03:50,097] {base_executor.py:122} DEBUG - 0 in queue [2019-12-01 > 19:03:50,098] {base_executor.py:123} DEBUG - 32 open slots [2019-12-01 > 19:03:50,099] {base_executor.py:132} DEBUG - Calling the <class > 'airflow.executors.sequential_executor.SequentialExecutor'> sync method > [2019-12-01 19:03:50,109] {cli_action_loggers.py:85} DEBUG - Calling > callbacks: [] Traceback (most recent call last): File > "/usr/local/bin/airflow", line 7, in <module> exec(compile(f.read(), > __file__, 'exec')) File "/opt/airflow/airflow/bin/airflow", line 39, in > <module> args.func(args) File "/opt/airflow/airflow/bin/cli.py", line 50, in > command return func(*args, **kwargs) File > "/opt/airflow/airflow/utils/cli.py", line 80, in wrapper return f(*args, > **kwargs) File "/opt/airflow/airflow/cli/commands/dag_command.py", line 94, > in dag_backfill run_backwards=args.run_backwards File > "/opt/airflow/airflow/models/dag.py", line 1276, in run job.run() File > "/opt/airflow/airflow/jobs/base_job.py", line 217, in run self._execute() > File "/opt/airflow/airflow/utils/db.py", line 68, in wrapper return > func(*args, **kwargs) File "/opt/airflow/airflow/jobs/backfill_job.py", line > 773, in _execute raise AirflowException(err) > airflow.exceptions.AirflowException: > --------------------------------------------------- BackfillJob is > deadlocked. These tasks have succeeded: set() These tasks are running: {} > These tasks have failed: set() These tasks are skipped: set() These tasks are > deadlocked: {<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 > 00:00:00+00:00 [shutdown]>}{code} > Please note that no attempt is made to start the sensor The task is blocked > immediately. Probably because the task was previously canceled - CTRL+C. -- This message was sent by Atlassian Jira (v8.3.4#803005)