[
https://issues.apache.org/jira/browse/AIRFLOW-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kaxil Naik resolved AIRFLOW-6141.
---------------------------------
Fix Version/s: 1.10.7
Resolution: Fixed
> The sensor cannot be executed by invalid deps
> ---------------------------------------------
>
> Key: AIRFLOW-6141
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6141
> Project: Apache Airflow
> Issue Type: Bug
> Components: backfill
> Affects Versions: 1.10.6
> Reporter: Kamil Bregula
> Priority: Major
> Fix For: 1.10.7
>
>
> Hello,
> I have following DAG:
> {code:python}
> """Example DAG demonstrating the usage of the BranchPythonOperator."""
> import random
> from os import path
> import airflow
> from airflow.contrib.sensors.python_sensor import PythonSensor
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import BranchPythonOperator
> dag = DAG(
> dag_id='demo_deadlock_backfill_issues',
> default_args={
> 'owner': 'Airflow',
> 'start_date': airflow.utils.dates.days_ago(2),
> },
> schedule_interval="@daily",
> )
> DEADLOCK_SHARED_SHARE = "/tmp/deadlock-shared-state"
> def python_callable():
> return False
> run = PythonSensor(
> task_id='run',
> dag=dag,
> python_callable=python_callable,
> ){code}
>
> I executed following command
> {code:bash}
> AIRFLOW__CORE__LOGGING_LEVEL=debug airflow dags backfill
> demo_deadlock_backfill_issues -s 2019-01-02 -e 2019-01-02 --reset_dagruns
> {code}
> I then got the following log:
> {code:java}
> /opt/airflow/airflow/models/dagbag.py:21: DeprecationWarning: the imp module
> is deprecated in favour of importlib; see the module's documentation for
> alternative uses import imp [2019-12-01 19:03:40,197] {settings.py:139} DEBUG
> - Setting up DB connection pool (PID 2429) [2019-12-01 19:03:40,198]
> {settings.py:180} INFO - settings.configure_orm(): Using pool settings.
> pool_size=5, max_overflow=10, pool_recycle=1800, pid=2429 [2019-12-01
> 19:03:40,269] {cli_action_loggers.py:41} DEBUG - Adding <function
> default_action_log at 0x7fb41b4eff28> to pre execution callback [2019-12-01
> 19:03:40,344] {cli_action_loggers.py:67} DEBUG - Calling callbacks:
> [<function default_action_log at 0x7fb41b4eff28>] [2019-12-01 19:03:40,361]
> {__init__.py:55} INFO - Using executor SequentialExecutor [2019-12-01
> 19:03:40,362] {dagbag.py:401} INFO - Filling up the DagBag from
> /root/airflow/dags [..] You are about to delete these 1 tasks: <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [running]>Are you
> sure? (yes/no): yes [2019-12-01 19:03:45,075] {backfill_job.py:396} DEBUG -
> *** Clearing out not_ready list *** [2019-12-01 19:03:45,085]
> {backfill_job.py:414} DEBUG - Task instance to run <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]> state
> shutdown [2019-12-01 19:03:45,091] {taskinstance.py:631} DEBUG -
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Task Instance Not Running' PASSED: True, Task is not
> in running state. [2019-12-01 19:03:45,091] {taskinstance.py:631} DEBUG -
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not
> have any upstream tasks. [2019-12-01 19:03:45,091] {taskinstance.py:631}
> DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02
> 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True,
> The task did not have depends_on_past set. [2019-12-01 19:03:45,092]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Ready To Reschedule' PASSED: True, The task instance is not in
> State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:45,092]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Not In Retry Period' PASSED: True, The task instance was not
> marked for retrying. [2019-12-01 19:03:45,095] {taskinstance.py:631} DEBUG -
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Task Instance State' PASSED: False, Task is in the
> 'shutdown' state which is not a valid state for execution. The task must be
> cleared in order to be run. [2019-12-01 19:03:45,095] {taskinstance.py:608}
> DEBUG - Dependencies not met for <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>,
> dependency 'Task Instance State' FAILED: Task is in the 'shutdown' state
> which is not a valid state for execution. The task must be cleared in order
> to be run. [2019-12-01 19:03:45,095] {backfill_job.py:536} DEBUG - Adding
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> to not_ready [2019-12-01 19:03:50,064] {base_job.py:196} DEBUG -
> [heartbeat] [2019-12-01 19:03:50,065] {base_executor.py:121} DEBUG - 0
> running task instances [2019-12-01 19:03:50,070] {base_executor.py:122} DEBUG
> - 0 in queue [2019-12-01 19:03:50,071] {base_executor.py:123} DEBUG - 32 open
> slots [2019-12-01 19:03:50,072] {base_executor.py:132} DEBUG - Calling the
> <class 'airflow.executors.sequential_executor.SequentialExecutor'> sync
> method [2019-12-01 19:03:50,075] {backfill_job.py:598} WARNING - Deadlock
> discovered for ti_status.to_run=odict_values([<TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>])
> [2019-12-01 19:03:50,081] {dagrun.py:272} DEBUG - Updating state for <DagRun
> demo_deadlock_backfill_issues @ 2019-01-02 00:00:00+00:00:
> backfill_2019-01-02T00:00:00+00:00, externally triggered: False> considering
> 1 task(s) [2019-12-01 19:03:50,085] {taskinstance.py:631} DEBUG -
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not
> have any upstream tasks. [2019-12-01 19:03:50,086] {taskinstance.py:631}
> DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02
> 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True,
> The task did not have depends_on_past set. [2019-12-01 19:03:50,086]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Ready To Reschedule' PASSED: True, The context specified that
> being in a reschedule period was permitted. [2019-12-01 19:03:50,086]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Not In Retry Period' PASSED: True, The context specified that
> being in a retry period was permitted. [2019-12-01 19:03:50,086]
> {taskinstance.py:614} DEBUG - Dependencies all met for <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> [2019-12-01 19:03:50,090] {backfill_job.py:359} INFO - [backfill progress] |
> finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed:
> 0 | skipped: 0 | deadlocked: 1 | not ready: 1 [2019-12-01 19:03:50,090]
> {backfill_job.py:364} DEBUG - Finished dag run loop iteration. Remaining
> tasks odict_values([]) [2019-12-01 19:03:50,091] {taskinstance.py:631} DEBUG
> - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Trigger Rule' PASSED: True, The task instance did not
> have any upstream tasks. [2019-12-01 19:03:50,091] {taskinstance.py:631}
> DEBUG - <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02
> 00:00:00+00:00 [shutdown]> dependency 'Previous Dagrun State' PASSED: True,
> The task did not have depends_on_past set. [2019-12-01 19:03:50,091]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Ready To Reschedule' PASSED: True, The task instance is not in
> State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,091]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Not In Retry Period' PASSED: True, The task instance was not
> marked for retrying. [2019-12-01 19:03:50,092] {taskinstance.py:614} DEBUG -
> Dependencies all met for <TaskInstance: demo_deadlock_backfill_issues.run
> 2019-01-02 00:00:00+00:00 [shutdown]> [2019-12-01 19:03:50,092]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Trigger Rule' PASSED: True, The task instance did not have any
> upstream tasks. [2019-12-01 19:03:50,093] {taskinstance.py:631} DEBUG -
> <TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00
> [shutdown]> dependency 'Previous Dagrun State' PASSED: True, The context
> specified that the state of past DAGs could be ignored. [2019-12-01
> 19:03:50,093] {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Ready To Reschedule' PASSED: True, The task instance is not in
> State_UP_FOR_RESCHEDULE or NONE state. [2019-12-01 19:03:50,094]
> {taskinstance.py:631} DEBUG - <TaskInstance:
> demo_deadlock_backfill_issues.run 2019-01-02 00:00:00+00:00 [shutdown]>
> dependency 'Not In Retry Period' PASSED: True, The task instance was not
> marked for retrying. [2019-12-01 19:03:50,095] {taskinstance.py:614} DEBUG -
> Dependencies all met for <TaskInstance: demo_deadlock_backfill_issues.run
> 2019-01-02 00:00:00+00:00 [shutdown]> [2019-12-01 19:03:50,097]
> {base_executor.py:121} DEBUG - 0 running task instances [2019-12-01
> 19:03:50,097] {base_executor.py:122} DEBUG - 0 in queue [2019-12-01
> 19:03:50,098] {base_executor.py:123} DEBUG - 32 open slots [2019-12-01
> 19:03:50,099] {base_executor.py:132} DEBUG - Calling the <class
> 'airflow.executors.sequential_executor.SequentialExecutor'> sync method
> [2019-12-01 19:03:50,109] {cli_action_loggers.py:85} DEBUG - Calling
> callbacks: [] Traceback (most recent call last): File
> "/usr/local/bin/airflow", line 7, in <module> exec(compile(f.read(),
> __file__, 'exec')) File "/opt/airflow/airflow/bin/airflow", line 39, in
> <module> args.func(args) File "/opt/airflow/airflow/bin/cli.py", line 50, in
> command return func(*args, **kwargs) File
> "/opt/airflow/airflow/utils/cli.py", line 80, in wrapper return f(*args,
> **kwargs) File "/opt/airflow/airflow/cli/commands/dag_command.py", line 94,
> in dag_backfill run_backwards=args.run_backwards File
> "/opt/airflow/airflow/models/dag.py", line 1276, in run job.run() File
> "/opt/airflow/airflow/jobs/base_job.py", line 217, in run self._execute()
> File "/opt/airflow/airflow/utils/db.py", line 68, in wrapper return
> func(*args, **kwargs) File "/opt/airflow/airflow/jobs/backfill_job.py", line
> 773, in _execute raise AirflowException(err)
> airflow.exceptions.AirflowException:
> --------------------------------------------------- BackfillJob is
> deadlocked. These tasks have succeeded: set() These tasks are running: {}
> These tasks have failed: set() These tasks are skipped: set() These tasks are
> deadlocked: {<TaskInstance: demo_deadlock_backfill_issues.run 2019-01-02
> 00:00:00+00:00 [shutdown]>}{code}
> Please note that no attempt is made to start the sensor The task is blocked
> immediately. Probably because the task was previously canceled - CTRL+C.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)