UnightSun opened a new issue #18977:
URL: https://github.com/apache/airflow/issues/18977
### Apache Airflow version
2.1.2
### Operating System
CentOS Linux 8 (Core)
### Versions of Apache Airflow Providers
```
Providers info
apache-airflow-providers-celery | 2.0.0
apache-airflow-providers-ftp | 2.0.0
apache-airflow-providers-http | 2.0.0
apache-airflow-providers-imap | 2.0.0
apache-airflow-providers-microsoft-mssql | 2.0.0
apache-airflow-providers-mysql | 2.1.0
apache-airflow-providers-redis | 2.0.0
apache-airflow-providers-sftp | 2.1.0
apache-airflow-providers-sqlite | 2.0.0
apache-airflow-providers-ssh | 2.1.0
```
### Deployment
Virtualenv installation
### Deployment details
python_version | 3.6.8 (default, Aug 24 2020, 17:57:11) [GCC 8.3.1
20191121 (Red Hat 8.3.1-5)]
### What happened
```python
with DAG(...) as dag:
def raise_():
raise AirflowSkipException('skip')
skip = PythonOperator(
task_id='skip',
python_callable = raise_,
)
t1 = PythonOperator(
...
python_callable = lambda: 1,
)
t2 = PythonOperator(
...
python_callable = lambda: 1,
)
skip >> t1 >> t2
if __name__ == '__main__':
# export AIRFLOW__CORE__EXECUTOR=DebugExecutor
from airflow.utils.state import State
from datetime import timedelta, datetime
dag.clear(dag_run_state=State.NONE)
today = datetime.today()
start = datetime(today.year, today.month, today.day)+timedelta(days=0)
dag.run(start_date=start)
```
prints:
```bash
[2021-10-14 07:06:07,528] {settings.py:208} DEBUG - Setting up DB connection
pool (PID 1013165)
[2021-10-14 07:06:08,239] {executor_loader.py:82} DEBUG - Loading core
executor: DebugExecutor
[2021-10-14 07:06:08,355] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [None]> to scheduled
[2021-10-14 07:06:08,360] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [None]> to scheduled
[2021-10-14 07:06:08,363] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [None]> to scheduled
[2021-10-14 07:06:08,377] {backfill_job.py:423} DEBUG - *** Clearing out
not_ready list ***
[2021-10-14 07:06:08,384] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00
[scheduled]> from DB
[2021-10-14 07:06:08,390] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]>
[2021-10-14 07:06:08,391] {backfill_job.py:440} DEBUG - Task instance to run
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> state
scheduled
[2021-10-14 07:06:08,391] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Trigger Rule'
PASSED: True, The task instance did not have any upstream tasks.
[2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Task Instance
Not Running' PASSED: True, Task is not in running state.
[2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Task Instance
State' PASSED: True, Task state scheduled was valid.
[2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous
Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:08,399] {taskinstance.py:896} DEBUG - Dependencies all met
for <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]>
[2021-10-14 07:06:08,399] {backfill_job.py:501} DEBUG - Sending
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> to executor
[2021-10-14 07:06:08,403] {base_executor.py:82} INFO - Adding to queue:
['<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>']
[2021-10-14 07:06:08,418] {backfill_job.py:598} DEBUG - Not scheduling since
DAG concurrency limit is reached.
[2021-10-14 07:06:13,278] {base_job.py:227} DEBUG - [heartbeat]
[2021-10-14 07:06:13,278] {base_executor.py:150} DEBUG - 0 running task
instances
[2021-10-14 07:06:13,280] {base_executor.py:151} DEBUG - 1 in queue
[2021-10-14 07:06:13,280] {base_executor.py:152} DEBUG - 32 open slots
[2021-10-14 07:06:13,280] {base_executor.py:161} DEBUG - Calling the <class
'airflow.executors.debug_executor.DebugExecutor'> sync method
[2021-10-14 07:06:13,282] {debug_executor.py:75} DEBUG - Executing task:
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
[2021-10-14 07:06:13,282] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
from DB
[2021-10-14 07:06:13,287] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
[2021-10-14 07:06:13,306] {plugins_manager.py:281} DEBUG - Loading plugins
[2021-10-14 07:06:13,306] {plugins_manager.py:225} DEBUG - Loading plugins
from directory: .../airflow-script/plugins
[2021-10-14 07:06:13,477] {plugins_manager.py:240} DEBUG - Importing plugin
module .../airflow-script/plugins/macros.py
[2021-10-14 07:06:13,479] {plugins_manager.py:205} DEBUG - Loading plugins
from entrypoints
[2021-10-14 07:06:13,494] {plugins_manager.py:297} DEBUG - Loading 1
plugin(s) took 0.19 seconds
[2021-10-14 07:06:13,494] {plugins_manager.py:418} DEBUG - Integrate DAG
plugins
[2021-10-14 07:06:13,494] {plugins_manager.py:255} DEBUG - Creating module
airflow.macros.xxxx
[2021-10-14 07:06:13,502] {taskinstance.py:677} DEBUG - Clearing XCom data
[2021-10-14 07:06:13,505] {taskinstance.py:684} DEBUG - XCom data cleared
[2021-10-14 07:06:13,518] {taskinstance.py:1302} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_dag
AIRFLOW_CTX_TASK_ID=skip
AIRFLOW_CTX_EXECUTION_DATE=2021-10-14T04:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-10-14T04:00:00+00:00
[2021-10-14 07:06:13,518] {taskinstance.py:1167} INFO - skip
[2021-10-14 07:06:13,519] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
from DB
[2021-10-14 07:06:13,524] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
[2021-10-14 07:06:13,525] {taskinstance.py:1177} INFO - Marking task as
SKIPPED. dag_id=test_dag, task_id=skip, execution_date=20211014T040000,
start_date=20211014T110450, end_date=20211014T110455
[2021-10-14 07:06:13,525] {taskinstance.py:1211} INFO - Marking task as
SUCCESS. dag_id=test_dag, task_id=skip, execution_date=20211014T040000,
start_date=20211014T110450, end_date=20211014T110613
[2021-10-14 07:06:13,525] {taskinstance.py:1906} DEBUG - Task Duration set
to 82.700477
[2021-10-14 07:06:13,678] {dagrun.py:490} DEBUG - number of tis tasks for
<DagRun test_dag @ 2021-10-14 04:00:00+00:00:
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 2 task(s)
[2021-10-14 07:06:13,679] {dagrun.py:505} DEBUG - number of scheduleable
tasks for <DagRun test_dag @ 2021-10-14 04:00:00+00:00:
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 0 task(s)
[2021-10-14 07:06:13,679] {taskinstance.py:1265} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2021-10-14 07:06:13,682] {debug_executor.py:147} DEBUG - Popping
TaskInstanceKey(dag_id='test_dag', task_id='skip',
execution_date=datetime.datetime(2021, 10, 14, 4, 0, tzinfo=Timezone('UTC')),
try_number=1) from executor task queue.
[2021-10-14 07:06:13,683] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]>
from DB
[2021-10-14 07:06:13,688] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]>
[2021-10-14 07:06:13,688] {backfill_job.py:274} DEBUG - Executor state:
success task <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]>
[2021-10-14 07:06:13,692] {backfill_job.py:215} DEBUG - Task instance
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]> skipped.
Don't rerun.
[2021-10-14 07:06:13,701] {dagrun.py:505} DEBUG - number of scheduleable
tasks for <DagRun test_dag @ 2021-10-14 04:00:00+00:00:
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 0 task(s)
[2021-10-14 07:06:13,702] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry
Period' PASSED: True, The context specified that being in a retry period was
permitted.
[2021-10-14 07:06:13,702] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:13,704] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
[2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1},
upstream_task_ids={'skip'}
[2021-10-14 07:06:13,704] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0,
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'skip'}
[2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry
Period' PASSED: True, The context specified that being in a retry period was
permitted.
[2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:13,705] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'t1'}
[2021-10-14 07:06:13,705] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>,
dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires
all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'t1'}
[2021-10-14 07:06:13,705] {dagrun.py:459} ERROR - Deadlock; marking run
<DagRun test_dag @ 2021-10-14 04:00:00+00:00:
backfill__2021-10-14T04:00:00+00:00, externally triggered: False> failed
[2021-10-14 07:06:13,711] {backfill_job.py:388} INFO - [backfill progress] |
finished run 1 of 1 | tasks waiting: 2 | succeeded: 0 | running: 0 | failed: 0
| skipped: 1 | deadlocked: 0 | not ready: 0
[2021-10-14 07:06:13,711] {backfill_job.py:391} DEBUG - Finished dag run
loop iteration. Remaining tasks odict_values([<TaskInstance: test_dag.t1
2021-10-14 04:00:00+00:00 [scheduled]>, <TaskInstance: test_dag.t2 2021-10-14
04:00:00+00:00 [scheduled]>])
[2021-10-14 07:06:13,712] {backfill_job.py:423} DEBUG - *** Clearing out
not_ready list ***
[2021-10-14 07:06:13,728] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]>
from DB
[2021-10-14 07:06:13,733] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]>
[2021-10-14 07:06:13,733] {backfill_job.py:440} DEBUG - Task instance to run
<TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> state skipped
[2021-10-14 07:06:13,733] {backfill_job.py:453} DEBUG - Task instance
<TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> skipped. Don't
rerun.
[2021-10-14 07:06:13,739] {taskinstance.py:614} DEBUG - Refreshing
TaskInstance <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>
from DB
[2021-10-14 07:06:13,743] {taskinstance.py:649} DEBUG - Refreshed
TaskInstance <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>
[2021-10-14 07:06:13,743] {backfill_job.py:440} DEBUG - Task instance to run
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> state
scheduled
[2021-10-14 07:06:13,743] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-10-14 07:06:13,746] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Dagrun Running'
PASSED: False, Task instance's dagrun was not in the 'running' state but in the
state 'failed'.
[2021-10-14 07:06:13,746] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>,
dependency 'Dagrun Running' FAILED: Task instance's dagrun was not in the
'running' state but in the state 'failed'.
[2021-10-14 07:06:13,750] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
[2021-10-14 07:06:13,753] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1},
upstream_task_ids={'t1'}
[2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0,
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance Not
Running' PASSED: True, Task is not in running state.
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance
State' PASSED: False, Task is in the 'skipped' state which is not a valid state
for execution. The task must be cleared in order to be run.
[2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Task Instance State' FAILED: Task is in the 'skipped' state which is not a
valid state for execution. The task must be cleared in order to be run.
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:13,754] {backfill_job.py:554} DEBUG - Adding
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> to not_ready
[2021-10-14 07:06:18,285] {base_job.py:227} DEBUG - [heartbeat]
[2021-10-14 07:06:18,286] {base_executor.py:150} DEBUG - 0 running task
instances
[2021-10-14 07:06:18,286] {base_executor.py:151} DEBUG - 0 in queue
[2021-10-14 07:06:18,286] {base_executor.py:152} DEBUG - 32 open slots
[2021-10-14 07:06:18,286] {base_executor.py:161} DEBUG - Calling the <class
'airflow.executors.debug_executor.DebugExecutor'> sync method
[2021-10-14 07:06:18,288] {backfill_job.py:612} WARNING - Deadlock
discovered for ti_status.to_run=odict_values([<TaskInstance: test_dag.t2
2021-10-14 04:00:00+00:00 [skipped]>])
[2021-10-14 07:06:18,288] {backfill_job.py:388} INFO - [backfill progress] |
finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 0
| skipped: 2 | deadlocked: 1 | not ready: 1
[2021-10-14 07:06:18,288] {backfill_job.py:391} DEBUG - Finished dag run
loop iteration. Remaining tasks odict_values([])
[2021-10-14 07:06:18,289] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-10-14 07:06:18,289] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1},
upstream_task_ids={'t1'}
[2021-10-14 07:06:18,295] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0,
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
[2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun
State' PASSED: True, The context specified that the state of past DAGs could be
ignored.
[2021-10-14 07:06:18,300] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1},
upstream_task_ids={'t1'}
[2021-10-14 07:06:18,300] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0,
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
Traceback (most recent call last):
File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/__main__.py",
line 45, in <module>
cli.main()
File
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py",
line 444, in main
run()
File
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py",
line 285, in run_file
runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
File "/usr/lib64/python3.6/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/usr/lib64/python3.6/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File ".../airflow-script/dags/test_dag.py", line 57, in <module>
dag.run(start_date=start)
File
".../airflow-script/venv/lib/python3.6/site-packages/airflow/models/dag.py",
line 1721, in run
job.run()
File
".../airflow-script/venv/lib/python3.6/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
".../airflow-script/venv/lib/python3.6/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
".../airflow-script/venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
line 812, in _execute
raise BackfillUnfinished(err, ti_status)
airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
These tasks have succeeded:
DAG ID Task ID Execution date Try number
-------- --------- ---------------- ------------
These tasks are running:
DAG ID Task ID Execution date Try number
-------- --------- ---------------- ------------
These tasks have failed:
DAG ID Task ID Execution date Try number
-------- --------- ---------------- ------------
These tasks are skipped:
DAG ID Task ID Execution date Try number
-------- --------- ------------------------- ------------
test_dag skip 2021-10-14 04:00:00+00:00 1
test_dag t1 2021-10-14 04:00:00+00:00 1
These tasks are deadlocked:
DAG ID Task ID Execution date Try number
-------- --------- ------------------------- ------------
test_dag t2 2021-10-14 04:00:00+00:00 1
[2021-10-14 07:06:18,824] {settings.py:302} DEBUG - Disposing DB connection
pool (PID 1013165)
```
### What you expected to happen
No deadlock
### How to reproduce
Any skip-chain length > 2, triggered by DebugExecutor will case a deadlock.
### Anything else
```bash
[2021-10-14 07:06:13,743] {backfill_job.py:440} DEBUG - Task instance to run
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> state
scheduled
[2021-10-14 07:06:13,743] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry
Period' PASSED: True, The task instance was not marked for retrying.
[2021-10-14 07:06:13,746] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Dagrun Running'
PASSED: False, Task instance's dagrun was not in the 'running' state but in the
state 'failed'.
[2021-10-14 07:06:13,746] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>,
dependency 'Dagrun Running' FAILED: Task instance's dagrun was not in the
'running' state but in the state 'failed'.
[2021-10-14 07:06:13,750] {taskinstance.py:702} DEBUG - Setting task state
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
[2021-10-14 07:06:13,753] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule'
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1},
upstream_task_ids={'t1'}
[2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream
tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0,
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance Not
Running' PASSED: True, Task is not in running state.
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance
State' PASSED: False, Task is in the 'skipped' state which is not a valid state
for execution. The task must be cleared in order to be run.
[2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency
'Task Instance State' FAILED: Task is in the 'skipped' state which is not a
valid state for execution. The task must be cleared in order to be run.
[2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance:
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun
State' PASSED: True, The task did not have depends_on_past set.
[2021-10-14 07:06:13,754] {backfill_job.py:554} DEBUG - Adding
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> to not_ready
```
backfill_job.py:
```python
...
self.log.debug("Task instance to run %s state %s", ti, ti.state)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == State.SUCCESS:
...
elif ti.state == State.SKIPPED:
...
... # seems like ti.state changed from 'scheduled' to 'skipped' here, state
check not work, so task added into 'not_ready'
if ti.state == State.UPSTREAM_FAILED:
...
if ti.state == State.UP_FOR_RETRY:
...
if ti.state == State.UP_FOR_RESCHEDULE:
...
# all remaining tasks
self.log.debug('Adding %s to not_ready', ti)
ti_status.not_ready.add(key)
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]