Kamil Bregula created AIRFLOW-6141:
--------------------------------------
Summary: The sensor cannot be executed by invalid deps
Key: AIRFLOW-6141
URL: https://issues.apache.org/jira/browse/AIRFLOW-6141
Project: Apache Airflow
Issue Type: Bug
Components: backfill
Affects Versions: 1.10.6
Reporter: Kamil Bregula
Hello,
I have following DAG:
{code:python}
"""Example DAG demonstrating the usage of the BranchPythonOperator."""
import random
from os import path
import airflow
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
dag = DAG(
dag_id='demo_deadlock_backfill_issues',
default_args={
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
},
schedule_interval="@daily",
)
DEADLOCK_SHARED_SHARE = "/tmp/deadlock-shared-state"
def python_callable():
return False
run = PythonSensor(
task_id='run',
dag=dag,
python_callable=python_callable,
){code}
I executed following command
{code:bash}
AIRFLOW__CORE__LOGGING_LEVEL=debug airflow dags backfill
demo_deadlock_backfill_issues -s 2019-01-02 -e 2019-01-02 --reset_dagruns
{code}
I then got the following log:
{code:java}
/opt/airflow/airflow/models/dagbag.py:21: DeprecationWarning: the imp module is
deprecated in favour of importlib; see the module's documentation for
alternative uses import imp [2019-12-01 19:03:40,197] {settings.py:139} DEBUG -
Setting up DB connection pool (PID 2429) [2019-12-01 19:03:40,198]
{settings.py:180} INFO - settings.configure_orm(): Using pool settings.
pool_size=5, max_overflow=10, pool_recycle=1800, pid=2429 [2019-12-01
19:03:40,269] {cli_action_loggers.py:41} DEBUG - Adding <function
default_action_log at 0x7fb41b4eff28> to pre execution callback [2019-12-01
19:03:40,344] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function
default_action_log at 0x7fb41b4eff28>] [2019-12-01 19:03:40,361]
{__init__.py:55} INFO - Using executor SequentialExecutor [2019-12-01
19:03:40,362] {dagbag.py:401} INFO - Filling up the DagBag from
/root/airflow/dags [..] You are about to delete these 1 tasks: <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [running]>Are you
sure? (yes/no): yes [2019-12-01 19:03:45,075] {backfill_job.py:396} DEBUG - ***
Clearing out not_ready list *** [2019-12-01 19:03:45,085] {backfill_job.py:414}
DEBUG - Task instance to run <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> state shutdown [2019-12-01 19:03:45,091]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Task Instance Not Running'
PASSED: True, Task is not in running state. [2019-12-01 19:03:45,091]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Trigger Rule' PASSED: True,
The task instance did not have any upstream tasks. [2019-12-01 19:03:45,091]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set. [2019-12-01
19:03:45,092] {taskinstance.py:631} DEBUG - <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
dependency 'Ready To Reschedule' PASSED: True, The task instance is not in
State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:45,092]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED:
True, The task instance was not marked for retrying. [2019-12-01 19:03:45,095]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Task Instance State' PASSED:
False, Task is in the 'shutdown' state which is not a valid state for
execution. The task must be cleared in order to be run. [2019-12-01
19:03:45,095] {taskinstance.py:608} DEBUG - Dependencies not met for
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]>, dependency 'Task Instance State' FAILED: Task is in the 'shutdown'
state which is not a valid state for execution. The task must be cleared in
order to be run. [2019-12-01 19:03:45,095] {backfill_job.py:536} DEBUG - Adding
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> to not_ready [2019-12-01 19:03:50,064] {base_job.py:196} DEBUG -
[heartbeat] [2019-12-01 19:03:50,065] {base_executor.py:121} DEBUG - 0 running
task instances [2019-12-01 19:03:50,070] {base_executor.py:122} DEBUG - 0 in
queue [2019-12-01 19:03:50,071] {base_executor.py:123} DEBUG - 32 open slots
[2019-12-01 19:03:50,072] {base_executor.py:132} DEBUG - Calling the <class
'airflow.executors.sequential_executor.SequentialExecutor'> sync method
[2019-12-01 19:03:50,075] {backfill_job.py:598} WARNING - Deadlock discovered
for ti_status.to_run=odict_values([<TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>])
[2019-12-01 19:03:50,081] {dagrun.py:272} DEBUG - Updating state for <DagRun
demo_deadlock_backfill_issues @ 2019-01-02 00:00:00+00:00:
backfill_2019-01-02T00:00:00+00:00, externally triggered: False> considering 1
task(s) [2019-12-01 19:03:50,085] {taskinstance.py:631} DEBUG - <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
dependency 'Trigger Rule' PASSED: True, The task instance did not have any
upstream tasks. [2019-12-01 19:03:50,086] {taskinstance.py:631} DEBUG -
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The task did not
have depends_on_past set. [2019-12-01 19:03:50,086] {taskinstance.py:631} DEBUG
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Ready To Reschedule' PASSED: True, The context
specified that being in a reschedule period was permitted. [2019-12-01
19:03:50,086] {taskinstance.py:631} DEBUG - <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
dependency 'Not In Retry Period' PASSED: True, The context specified that being
in a retry period was permitted. [2019-12-01 19:03:50,086]
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
[2019-12-01 19:03:50,090] {backfill_job.py:359} INFO - [backfill progress] |
finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 0
| skipped: 0 | deadlocked: 1 | not ready: 1 [2019-12-01 19:03:50,090]
{backfill_job.py:364} DEBUG - Finished dag run loop iteration. Remaining tasks
odict_values([]) [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG -
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not
have any upstream tasks. [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The task did not
have depends_on_past set. [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG
- <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Ready To Reschedule' PASSED: True, The task instance is
not in State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,091]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED:
True, The task instance was not marked for retrying. [2019-12-01 19:03:50,092]
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
[2019-12-01 19:03:50,092] {taskinstance.py:631} DEBUG - <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
dependency 'Trigger Rule' PASSED: True, The task instance did not have any
upstream tasks. [2019-12-01 19:03:50,093] {taskinstance.py:631} DEBUG -
<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]> dependency 'Previous Dagrun State' PASSED: True, The context
specified that the state of past DAGs could be ignored. [2019-12-01
19:03:50,093] {taskinstance.py:631} DEBUG - <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
dependency 'Ready To Reschedule' PASSED: True, The task instance is not in
State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,094]
{taskinstance.py:631} DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run
2019-01-02 00:00:00+00:00 [shutdown]> dependency 'Not In Retry Period' PASSED:
True, The task instance was not marked for retrying. [2019-12-01 19:03:50,095]
{taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance:
demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
[2019-12-01 19:03:50,097] {base_executor.py:121} DEBUG - 0 running task
instances [2019-12-01 19:03:50,097] {base_executor.py:122} DEBUG - 0 in queue
[2019-12-01 19:03:50,098] {base_executor.py:123} DEBUG - 32 open slots
[2019-12-01 19:03:50,099] {base_executor.py:132} DEBUG - Calling the <class
'airflow.executors.sequential_executor.SequentialExecutor'> sync method
[2019-12-01 19:03:50,109] {cli_action_loggers.py:85} DEBUG - Calling callbacks:
[] Traceback (most recent call last): File "/usr/local/bin/airflow", line 7, in
<module> exec(compile(f.read(), __file__, 'exec')) File
"/opt/airflow/airflow/bin/airflow", line 39, in <module> args.func(args) File
"/opt/airflow/airflow/bin/cli.py", line 50, in command return func(*args,
**kwargs) File "/opt/airflow/airflow/utils/cli.py", line 80, in wrapper return
f(*args, **kwargs) File "/opt/airflow/airflow/cli/commands/dag_command.py",
line 94, in dag_backfill run_backwards=args.run_backwards File
"/opt/airflow/airflow/models/dag.py", line 1276, in run job.run() File
"/opt/airflow/airflow/jobs/base_job.py", line 217, in run self._execute() File
"/opt/airflow/airflow/utils/db.py", line 68, in wrapper return func(*args,
**kwargs) File "/opt/airflow/airflow/jobs/backfill_job.py", line 773, in
_execute raise AirflowException(err) airflow.exceptions.AirflowException:
--------------------------------------------------- BackfillJob is deadlocked.
These tasks have succeeded: set() These tasks are running: {} These tasks have
failed: set() These tasks are skipped: set() These tasks are deadlocked:
{<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
[shutdown]>}{code}
Please note that no attempt is made to start the sensor The task is blocked
immediately. Probably because the task was previously canceled - CTRL+C.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)