Kamil Bregula created AIRFLOW-6141:
--------------------------------------

             Summary: The sensor cannot be executed by invalid deps
                 Key: AIRFLOW-6141
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6141
             Project: Apache Airflow
          Issue Type: Bug
          Components: backfill
    Affects Versions: 1.10.6
            Reporter: Kamil Bregula


Hello,

I have following DAG: 
{code:python}
"""Example DAG demonstrating the usage of the BranchPythonOperator."""

import random
from os import path

import airflow
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

dag = DAG(
    dag_id='demo_deadlock_backfill_issues',
    default_args={
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(2),
    },
    schedule_interval="@daily",
)

DEADLOCK_SHARED_SHARE = "/tmp/deadlock-shared-state"


def python_callable():
    return False


run = PythonSensor(
    task_id='run',
    dag=dag,
    python_callable=python_callable,
){code}
 
 I executed following command
{code:bash}
AIRFLOW__CORE__LOGGING_LEVEL=debug airflow dags backfill 
demo_deadlock_backfill_issues -s 2019-01-02 -e 2019-01-02 --reset_dagruns
{code}
 I then got the following log:
{code:java}
/opt/airflow/airflow/models/dagbag.py:21: DeprecationWarning: the imp module is 
deprecated in favour of importlib; see the module's documentation for 
alternative uses import imp [2019-12-01 19:03:40,197] {settings.py:139} DEBUG - 
Setting up DB connection pool (PID 2429) [2019-12-01 19:03:40,198] 
{settings.py:180} INFO - settings.configure_orm(): Using pool settings. 
pool_size=5, max_overflow=10, pool_recycle=1800, pid=2429 [2019-12-01 
19:03:40,269] {cli_action_loggers.py:41} DEBUG - Adding <function 
default_action_log at 0x7fb41b4eff28> to pre execution callback [2019-12-01 
19:03:40,344] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function 
default_action_log at 0x7fb41b4eff28>] [2019-12-01 19:03:40,361] 
{__init__.py:55} INFO - Using executor SequentialExecutor [2019-12-01 
19:03:40,362] {dagbag.py:401} INFO - Filling up the DagBag from 
/root/airflow/dags [..] You are about to delete these 1 tasks: <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [running]>Are you 
sure? (yes/no): yes [2019-12-01 19:03:45,075] {backfill_job.py:396} DEBUG - *** 
Clearing out not_ready list *** [2019-12-01 19:03:45,085] {backfill_job.py:414} 
DEBUG - Task instance to run <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> state shutdown [2019-12-01 19:03:45,091] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Task Instance Not Running' 
PASSED: True, Task is not in running state. [2019-12-01 19:03:45,091] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Trigger Rule' PASSED: True, 
The task instance did not have any upstream tasks. [2019-12-01 19:03:45,091] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set. [2019-12-01 
19:03:45,092] {taskinstance.py:631} DEBUG - <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
dependency 'Ready To Reschedule' PASSED: True, The task instance is not in 
State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:45,092] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED: 
True, The task instance was not marked for retrying. [2019-12-01 19:03:45,095] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Task Instance State' PASSED: 
False, Task is in the 'shutdown' state which is not a valid state for 
execution. The task must be cleared in order to be run. [2019-12-01 
19:03:45,095] {taskinstance.py:608} DEBUG - Dependencies not met for 
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]>, dependency 'Task Instance State' FAILED: Task is in the 'shutdown' 
state which is not a valid state for execution. The task must be cleared in 
order to be run. [2019-12-01 19:03:45,095] {backfill_job.py:536} DEBUG - Adding 
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> to not_ready [2019-12-01 19:03:50,064] {base_job.py:196} DEBUG - 
[heartbeat] [2019-12-01 19:03:50,065] {base_executor.py:121} DEBUG - 0 running 
task instances [2019-12-01 19:03:50,070] {base_executor.py:122} DEBUG - 0 in 
queue [2019-12-01 19:03:50,071] {base_executor.py:123} DEBUG - 32 open slots 
[2019-12-01 19:03:50,072] {base_executor.py:132} DEBUG - Calling the <class 
'airflow.executors.sequential_executor.SequentialExecutor'> sync method 
[2019-12-01 19:03:50,075] {backfill_job.py:598} WARNING - Deadlock discovered 
for ti_status.to_run=odict_values([<TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>]) 
[2019-12-01 19:03:50,081] {dagrun.py:272} DEBUG - Updating state for <DagRun 
demo_deadlock_backfill_issues @ 2019-01-02 00:00:00+00:00: 
backfill_2019-01-02T00:00:00+00:00, externally triggered: False> considering 1 
task(s) [2019-12-01 19:03:50,085] {taskinstance.py:631} DEBUG - <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
dependency 'Trigger Rule' PASSED: True, The task instance did not have any 
upstream tasks. [2019-12-01 19:03:50,086] {taskinstance.py:631} DEBUG - 
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The task did not 
have depends_on_past set. [2019-12-01 19:03:50,086] {taskinstance.py:631} DEBUG 
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Ready To Reschedule' PASSED: True, The context 
specified that being in a reschedule period was permitted. [2019-12-01 
19:03:50,086] {taskinstance.py:631} DEBUG - <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
dependency 'Not In Retry Period' PASSED: True, The context specified that being 
in a retry period was permitted. [2019-12-01 19:03:50,086] 
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
[2019-12-01 19:03:50,090] {backfill_job.py:359} INFO - [backfill progress] | 
finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 0 
| skipped: 0 | deadlocked: 1 | not ready: 1 [2019-12-01 19:03:50,090] 
{backfill_job.py:364} DEBUG - Finished dag run loop iteration. Remaining tasks 
odict_values([]) [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG - 
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not 
have any upstream tasks. [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG 
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The task did not 
have depends_on_past set. [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG 
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Ready To Reschedule' PASSED: True, The task instance is 
not in State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,091] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED: 
True, The task instance was not marked for retrying. [2019-12-01 19:03:50,092] 
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
[2019-12-01 19:03:50,092] {taskinstance.py:631} DEBUG - <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
dependency 'Trigger Rule' PASSED: True, The task instance did not have any 
upstream tasks. [2019-12-01 19:03:50,093] {taskinstance.py:631} DEBUG - 
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The context 
specified that the state of past DAGs could be ignored. [2019-12-01 
19:03:50,093] {taskinstance.py:631} DEBUG - <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
dependency 'Ready To Reschedule' PASSED: True, The task instance is not in 
State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,094] 
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED: 
True, The task instance was not marked for retrying. [2019-12-01 19:03:50,095] 
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance: 
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> 
[2019-12-01 19:03:50,097] {base_executor.py:121} DEBUG - 0 running task 
instances [2019-12-01 19:03:50,097] {base_executor.py:122} DEBUG - 0 in queue 
[2019-12-01 19:03:50,098] {base_executor.py:123} DEBUG - 32 open slots 
[2019-12-01 19:03:50,099] {base_executor.py:132} DEBUG - Calling the <class 
'airflow.executors.sequential_executor.SequentialExecutor'> sync method 
[2019-12-01 19:03:50,109] {cli_action_loggers.py:85} DEBUG - Calling callbacks: 
[] Traceback (most recent call last): File "/usr/local/bin/airflow", line 7, in 
<module> exec(compile(f.read(), __file__, 'exec')) File 
"/opt/airflow/airflow/bin/airflow", line 39, in <module> args.func(args) File 
"/opt/airflow/airflow/bin/cli.py", line 50, in command return func(*args, 
**kwargs) File "/opt/airflow/airflow/utils/cli.py", line 80, in wrapper return 
f(*args, **kwargs) File "/opt/airflow/airflow/cli/commands/dag_command.py", 
line 94, in dag_backfill run_backwards=args.run_backwards File 
"/opt/airflow/airflow/models/dag.py", line 1276, in run job.run() File 
"/opt/airflow/airflow/jobs/base_job.py", line 217, in run self._execute() File 
"/opt/airflow/airflow/utils/db.py", line 68, in wrapper return func(*args, 
**kwargs) File "/opt/airflow/airflow/jobs/backfill_job.py", line 773, in 
_execute raise AirflowException(err) airflow.exceptions.AirflowException: 
--------------------------------------------------- BackfillJob is deadlocked. 
These tasks have succeeded: set() These tasks are running: {} These tasks have 
failed: set() These tasks are skipped: set() These tasks are deadlocked: 
{<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 
[shutdown]>}{code}
Please note that no attempt is made to start the sensor The task is blocked 
immediately. Probably because the task was previously canceled - CTRL+C. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to