UnightSun opened a new issue #18977:
URL: https://github.com/apache/airflow/issues/18977


   ### Apache Airflow version
   
   2.1.2
   
   ### Operating System
   
   CentOS Linux 8 (Core)
   
   ### Versions of Apache Airflow Providers
   
   ```
   Providers info
   apache-airflow-providers-celery          | 2.0.0
   apache-airflow-providers-ftp             | 2.0.0
   apache-airflow-providers-http            | 2.0.0
   apache-airflow-providers-imap            | 2.0.0
   apache-airflow-providers-microsoft-mssql | 2.0.0
   apache-airflow-providers-mysql           | 2.1.0
   apache-airflow-providers-redis           | 2.0.0
   apache-airflow-providers-sftp            | 2.1.0
   apache-airflow-providers-sqlite          | 2.0.0
   apache-airflow-providers-ssh             | 2.1.0
   ```
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   python_version  | 3.6.8 (default, Aug 24 2020, 17:57:11)  [GCC 8.3.1 
20191121 (Red Hat 8.3.1-5)] 
   
   ### What happened
   
   ```python
   with DAG(...) as dag:
       def raise_():
           raise AirflowSkipException('skip')
   
       skip = PythonOperator(
           task_id='skip',
           python_callable = raise_,
       )
   
       t1 = PythonOperator(
           ...
           python_callable = lambda: 1,
       )
   
       t2 = PythonOperator(
           ...
           python_callable = lambda: 1,
       )
   
       skip >> t1 >> t2
   
   
   if __name__ == '__main__':
       # export AIRFLOW__CORE__EXECUTOR=DebugExecutor
       from airflow.utils.state import State
       from datetime import timedelta, datetime
       dag.clear(dag_run_state=State.NONE)
       today = datetime.today()
       start = datetime(today.year, today.month, today.day)+timedelta(days=0)
       dag.run(start_date=start)
   ```
   prints:
   ```bash
   [2021-10-14 07:06:07,528] {settings.py:208} DEBUG - Setting up DB connection 
pool (PID 1013165)
   [2021-10-14 07:06:08,239] {executor_loader.py:82} DEBUG - Loading core 
executor: DebugExecutor
   [2021-10-14 07:06:08,355] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [None]> to scheduled
   [2021-10-14 07:06:08,360] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [None]> to scheduled
   [2021-10-14 07:06:08,363] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [None]> to scheduled
   [2021-10-14 07:06:08,377] {backfill_job.py:423} DEBUG - *** Clearing out 
not_ready list ***
   [2021-10-14 07:06:08,384] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 
[scheduled]> from DB
   [2021-10-14 07:06:08,390] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]>
   [2021-10-14 07:06:08,391] {backfill_job.py:440} DEBUG - Task instance to run 
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> state 
scheduled
   [2021-10-14 07:06:08,391] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Trigger Rule' 
PASSED: True, The task instance did not have any upstream tasks.
   [2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Task Instance 
Not Running' PASSED: True, Task is not in running state.
   [2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Task Instance 
State' PASSED: True, Task state scheduled was valid.
   [2021-10-14 07:06:08,394] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous 
Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:08,399] {taskinstance.py:896} DEBUG - Dependencies all met 
for <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]>
   [2021-10-14 07:06:08,399] {backfill_job.py:501} DEBUG - Sending 
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [scheduled]> to executor
   [2021-10-14 07:06:08,403] {base_executor.py:82} INFO - Adding to queue: 
['<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>']
   [2021-10-14 07:06:08,418] {backfill_job.py:598} DEBUG - Not scheduling since 
DAG concurrency limit is reached.
   [2021-10-14 07:06:13,278] {base_job.py:227} DEBUG - [heartbeat]
   [2021-10-14 07:06:13,278] {base_executor.py:150} DEBUG - 0 running task 
instances
   [2021-10-14 07:06:13,280] {base_executor.py:151} DEBUG - 1 in queue
   [2021-10-14 07:06:13,280] {base_executor.py:152} DEBUG - 32 open slots
   [2021-10-14 07:06:13,280] {base_executor.py:161} DEBUG - Calling the <class 
'airflow.executors.debug_executor.DebugExecutor'> sync method
   [2021-10-14 07:06:13,282] {debug_executor.py:75} DEBUG - Executing task: 
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
   [2021-10-14 07:06:13,282] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]> 
from DB
   [2021-10-14 07:06:13,287] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
   [2021-10-14 07:06:13,306] {plugins_manager.py:281} DEBUG - Loading plugins
   [2021-10-14 07:06:13,306] {plugins_manager.py:225} DEBUG - Loading plugins 
from directory: .../airflow-script/plugins
   [2021-10-14 07:06:13,477] {plugins_manager.py:240} DEBUG - Importing plugin 
module .../airflow-script/plugins/macros.py
   [2021-10-14 07:06:13,479] {plugins_manager.py:205} DEBUG - Loading plugins 
from entrypoints
   [2021-10-14 07:06:13,494] {plugins_manager.py:297} DEBUG - Loading 1 
plugin(s) took 0.19 seconds
   [2021-10-14 07:06:13,494] {plugins_manager.py:418} DEBUG - Integrate DAG 
plugins
   [2021-10-14 07:06:13,494] {plugins_manager.py:255} DEBUG - Creating module 
airflow.macros.xxxx
   [2021-10-14 07:06:13,502] {taskinstance.py:677} DEBUG - Clearing XCom data
   [2021-10-14 07:06:13,505] {taskinstance.py:684} DEBUG - XCom data cleared
   [2021-10-14 07:06:13,518] {taskinstance.py:1302} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=test_dag
   AIRFLOW_CTX_TASK_ID=skip
   AIRFLOW_CTX_EXECUTION_DATE=2021-10-14T04:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-10-14T04:00:00+00:00
   [2021-10-14 07:06:13,518] {taskinstance.py:1167} INFO - skip
   [2021-10-14 07:06:13,519] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]> 
from DB
   [2021-10-14 07:06:13,524] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [queued]>
   [2021-10-14 07:06:13,525] {taskinstance.py:1177} INFO - Marking task as 
SKIPPED. dag_id=test_dag, task_id=skip, execution_date=20211014T040000, 
start_date=20211014T110450, end_date=20211014T110455
   [2021-10-14 07:06:13,525] {taskinstance.py:1211} INFO - Marking task as 
SUCCESS. dag_id=test_dag, task_id=skip, execution_date=20211014T040000, 
start_date=20211014T110450, end_date=20211014T110613
   [2021-10-14 07:06:13,525] {taskinstance.py:1906} DEBUG - Task Duration set 
to 82.700477
   [2021-10-14 07:06:13,678] {dagrun.py:490} DEBUG - number of tis tasks for 
<DagRun test_dag @ 2021-10-14 04:00:00+00:00: 
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 2 task(s)
   [2021-10-14 07:06:13,679] {dagrun.py:505} DEBUG - number of scheduleable 
tasks for <DagRun test_dag @ 2021-10-14 04:00:00+00:00: 
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 0 task(s)
   [2021-10-14 07:06:13,679] {taskinstance.py:1265} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2021-10-14 07:06:13,682] {debug_executor.py:147} DEBUG - Popping 
TaskInstanceKey(dag_id='test_dag', task_id='skip', 
execution_date=datetime.datetime(2021, 10, 14, 4, 0, tzinfo=Timezone('UTC')), 
try_number=1) from executor task queue.
   [2021-10-14 07:06:13,683] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]> 
from DB
   [2021-10-14 07:06:13,688] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]>
   [2021-10-14 07:06:13,688] {backfill_job.py:274} DEBUG - Executor state: 
success task <TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]>
   [2021-10-14 07:06:13,692] {backfill_job.py:215} DEBUG - Task instance 
<TaskInstance: test_dag.skip 2021-10-14 04:00:00+00:00 [skipped]> skipped. 
Don't rerun.
   [2021-10-14 07:06:13,701] {dagrun.py:505} DEBUG - number of scheduleable 
tasks for <DagRun test_dag @ 2021-10-14 04:00:00+00:00: 
backfill__2021-10-14T04:00:00+00:00, externally triggered: False>: 0 task(s)
   [2021-10-14 07:06:13,702] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry 
Period' PASSED: True, The context specified that being in a retry period was 
permitted.
   [2021-10-14 07:06:13,702] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:13,704] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
   [2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids={'skip'}
   [2021-10-14 07:06:13,704] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0, 
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'skip'}
   [2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry 
Period' PASSED: True, The context specified that being in a retry period was 
permitted.
   [2021-10-14 07:06:13,704] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:13,705] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,705] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>, 
dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires 
all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,705] {dagrun.py:459} ERROR - Deadlock; marking run 
<DagRun test_dag @ 2021-10-14 04:00:00+00:00: 
backfill__2021-10-14T04:00:00+00:00, externally triggered: False> failed
   [2021-10-14 07:06:13,711] {backfill_job.py:388} INFO - [backfill progress] | 
finished run 1 of 1 | tasks waiting: 2 | succeeded: 0 | running: 0 | failed: 0 
| skipped: 1 | deadlocked: 0 | not ready: 0
   [2021-10-14 07:06:13,711] {backfill_job.py:391} DEBUG - Finished dag run 
loop iteration. Remaining tasks odict_values([<TaskInstance: test_dag.t1 
2021-10-14 04:00:00+00:00 [scheduled]>, <TaskInstance: test_dag.t2 2021-10-14 
04:00:00+00:00 [scheduled]>])
   [2021-10-14 07:06:13,712] {backfill_job.py:423} DEBUG - *** Clearing out 
not_ready list ***
   [2021-10-14 07:06:13,728] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [scheduled]> 
from DB
   [2021-10-14 07:06:13,733] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]>
   [2021-10-14 07:06:13,733] {backfill_job.py:440} DEBUG - Task instance to run 
<TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> state skipped
   [2021-10-14 07:06:13,733] {backfill_job.py:453} DEBUG - Task instance 
<TaskInstance: test_dag.t1 2021-10-14 04:00:00+00:00 [skipped]> skipped. Don't 
rerun.
   [2021-10-14 07:06:13,739] {taskinstance.py:614} DEBUG - Refreshing 
TaskInstance <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> 
from DB
   [2021-10-14 07:06:13,743] {taskinstance.py:649} DEBUG - Refreshed 
TaskInstance <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>
   [2021-10-14 07:06:13,743] {backfill_job.py:440} DEBUG - Task instance to run 
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> state 
scheduled
   [2021-10-14 07:06:13,743] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-10-14 07:06:13,746] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Dagrun Running' 
PASSED: False, Task instance's dagrun was not in the 'running' state but in the 
state 'failed'.
   [2021-10-14 07:06:13,746] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>, 
dependency 'Dagrun Running' FAILED: Task instance's dagrun was not in the 
'running' state but in the state 'failed'.
   [2021-10-14 07:06:13,750] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
   [2021-10-14 07:06:13,753] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0, 
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance Not 
Running' PASSED: True, Task is not in running state.
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance 
State' PASSED: False, Task is in the 'skipped' state which is not a valid state 
for execution. The task must be cleared in order to be run.
   [2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Task Instance State' FAILED: Task is in the 'skipped' state which is not a 
valid state for execution. The task must be cleared in order to be run.
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:13,754] {backfill_job.py:554} DEBUG - Adding 
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> to not_ready
   [2021-10-14 07:06:18,285] {base_job.py:227} DEBUG - [heartbeat]
   [2021-10-14 07:06:18,286] {base_executor.py:150} DEBUG - 0 running task 
instances
   [2021-10-14 07:06:18,286] {base_executor.py:151} DEBUG - 0 in queue
   [2021-10-14 07:06:18,286] {base_executor.py:152} DEBUG - 32 open slots
   [2021-10-14 07:06:18,286] {base_executor.py:161} DEBUG - Calling the <class 
'airflow.executors.debug_executor.DebugExecutor'> sync method
   [2021-10-14 07:06:18,288] {backfill_job.py:612} WARNING - Deadlock 
discovered for ti_status.to_run=odict_values([<TaskInstance: test_dag.t2 
2021-10-14 04:00:00+00:00 [skipped]>])
   [2021-10-14 07:06:18,288] {backfill_job.py:388} INFO - [backfill progress] | 
finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 0 
| skipped: 2 | deadlocked: 1 | not ready: 1
   [2021-10-14 07:06:18,288] {backfill_job.py:391} DEBUG - Finished dag run 
loop iteration. Remaining tasks odict_values([])
   [2021-10-14 07:06:18,289] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-10-14 07:06:18,289] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids={'t1'}
   [2021-10-14 07:06:18,295] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0, 
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
   [2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-10-14 07:06:18,295] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun 
State' PASSED: True, The context specified that the state of past DAGs could be 
ignored.
   [2021-10-14 07:06:18,300] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids={'t1'}
   [2021-10-14 07:06:18,300] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0, 
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
   Traceback (most recent call last):
     File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
       "__main__", mod_spec)
     File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
       exec(code, run_globals)
     File 
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/__main__.py",
 line 45, in <module>
       cli.main()
     File 
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py",
 line 444, in main
       run()
     File 
".../.vscode-server/extensions/ms-python.python-2021.10.1317843341/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py",
 line 285, in run_file
       runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
     File "/usr/lib64/python3.6/runpy.py", line 263, in run_path
       pkg_name=pkg_name, script_name=fname)
     File "/usr/lib64/python3.6/runpy.py", line 96, in _run_module_code
       mod_name, mod_spec, pkg_name, script_name)
     File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
       exec(code, run_globals)
     File ".../airflow-script/dags/test_dag.py", line 57, in <module>
       dag.run(start_date=start)
     File 
".../airflow-script/venv/lib/python3.6/site-packages/airflow/models/dag.py", 
line 1721, in run
       job.run()
     File 
".../airflow-script/venv/lib/python3.6/site-packages/airflow/jobs/base_job.py", 
line 245, in run
       self._execute()
     File 
".../airflow-script/venv/lib/python3.6/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
".../airflow-script/venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py",
 line 812, in _execute
       raise BackfillUnfinished(err, ti_status)
   airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
   These tasks have succeeded:
   DAG ID    Task ID    Execution date    Try number
   --------  ---------  ----------------  ------------
   
   These tasks are running:
   DAG ID    Task ID    Execution date    Try number
   --------  ---------  ----------------  ------------
   
   These tasks have failed:
   DAG ID    Task ID    Execution date    Try number
   --------  ---------  ----------------  ------------
   
   These tasks are skipped:
   DAG ID    Task ID    Execution date               Try number
   --------  ---------  -------------------------  ------------
   test_dag  skip       2021-10-14 04:00:00+00:00             1
   test_dag  t1         2021-10-14 04:00:00+00:00             1
   
   These tasks are deadlocked:
   DAG ID    Task ID    Execution date               Try number
   --------  ---------  -------------------------  ------------
   test_dag  t2         2021-10-14 04:00:00+00:00             1
   [2021-10-14 07:06:18,824] {settings.py:302} DEBUG - Disposing DB connection 
pool (PID 1013165)
   ```
   
   ### What you expected to happen
   
   No deadlock
   
   ### How to reproduce
   
   Any skip-chain length > 2, triggered by DebugExecutor will case a deadlock.
   
   ### Anything else
   
   ```bash
   [2021-10-14 07:06:13,743] {backfill_job.py:440} DEBUG - Task instance to run 
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> state 
scheduled
   [2021-10-14 07:06:13,743] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Not In Retry 
Period' PASSED: True, The task instance was not marked for retrying.
   [2021-10-14 07:06:13,746] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> dependency 'Dagrun Running' 
PASSED: False, Task instance's dagrun was not in the 'running' state but in the 
state 'failed'.
   [2021-10-14 07:06:13,746] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]>, 
dependency 'Dagrun Running' FAILED: Task instance's dagrun was not in the 
'running' state but in the state 'failed'.
   [2021-10-14 07:06:13,750] {taskinstance.py:702} DEBUG - Setting task state 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [scheduled]> to skipped
   [2021-10-14 07:06:13,753] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Trigger Rule' 
PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to 
have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream 
tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 1, 'failed': 0, 
'upstream_failed': 0, 'done': 1}, upstream_task_ids={'t1'}
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance Not 
Running' PASSED: True, Task is not in running state.
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Task Instance 
State' PASSED: False, Task is in the 'skipped' state which is not a valid state 
for execution. The task must be cleared in order to be run.
   [2021-10-14 07:06:13,754] {taskinstance.py:890} DEBUG - Dependencies not met 
for <TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]>, dependency 
'Task Instance State' FAILED: Task is in the 'skipped' state which is not a 
valid state for execution. The task must be cleared in order to be run.
   [2021-10-14 07:06:13,754] {taskinstance.py:911} DEBUG - <TaskInstance: 
test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> dependency 'Previous Dagrun 
State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-14 07:06:13,754] {backfill_job.py:554} DEBUG - Adding 
<TaskInstance: test_dag.t2 2021-10-14 04:00:00+00:00 [skipped]> to not_ready
   ```
   backfill_job.py:
   ```python
   
   ...
   
   self.log.debug("Task instance to run %s state %s", ti, ti.state)
   
   # The task was already marked successful or skipped by a
   # different Job. Don't rerun it.
   if ti.state == State.SUCCESS:
       ...
   elif ti.state == State.SKIPPED:
       ...
   
   ... # seems like ti.state changed from 'scheduled' to 'skipped' here, state 
check not work, so task added into 'not_ready'
   
   if ti.state == State.UPSTREAM_FAILED:
       ...
   if ti.state == State.UP_FOR_RETRY:
       ...
   if ti.state == State.UP_FOR_RESCHEDULE:
       ...
   
   # all remaining tasks
   self.log.debug('Adding %s to not_ready', ti)
   ti_status.not_ready.add(key)
   ```
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to