anmol commented on issue #18843:
URL: https://github.com/apache/airflow/issues/18843#issuecomment-947419133
Not sure if this qualifies as a separate bug but I think it might be related
- facing after Upgraded from 2.1.2 - > 2.2.0.
airflow version: 2.2.0
Deployment details
- AWS EKS over own helm chart
- KubernetesExecutor
How to Reproduce: Restart (rollout) Scheduler when a task is running for a
dag_id = dag_1, dag_run_1
Error: Scheduler goes in CrashLoopBackOff with error -
```
{scheduler_job.py:952} ERROR - Couldn't find dag dag_1 in DagBag/DB!
...
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun dag_run_1>
needs to be set
```
Full Stacktrace for an actual Dag:
```
[2021-10-20 07:30:42,955] {retries.py:82} DEBUG - Running
SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
[2021-10-20 07:30:42,972] {retries.py:82} DEBUG - Running
SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
[2021-10-20 07:30:42,974] {serialized_dag.py:205} DEBUG - Deleting
Serialized DAGs (for which DAG files are deleted) from serialized_dag table
[2021-10-20 07:30:42,982] {dag.py:2850} DEBUG - Deactivating DAGs (for which
DAG files are deleted) from dag table
[2021-10-20 07:30:42,990] {retries.py:82} DEBUG - Running
SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
[2021-10-20 07:30:43,000] {scheduler_job.py:952} ERROR - Couldn't find dag
ecicjc_query_table_16 in DagBag/DB!
[2021-10-20 07:30:43,014] {scheduler_job.py:1007} DEBUG - DAG
smart_sensor_group_shard_0 not changed structure, skipping
dagrun.verify_integrity
[2021-10-20 07:30:43,019] {dagrun.py:543} DEBUG - number of tis tasks for
<DagRun smart_sensor_group_shard_0 @ 2021-10-06 06:23:28.320990+00:00:
scheduled__2021-10-06T06:23:28.320990+00:00, externally triggered: False>: 1
task(s)
[2021-10-20 07:30:43,019] {dagrun.py:558} DEBUG - number of scheduleable
tasks for <DagRun smart_sensor_group_shard_0 @ 2021-10-06
06:23:28.320990+00:00: scheduled__2021-10-06T06:23:28.320990+00:00, externally
triggered: False>: 0 task(s)
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance:
smart_sensor_group_shard_0.smart_sensor_task
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Previous
Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance:
smart_sensor_group_shard_0.smart_sensor_task
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Not In
Retry Period' PASSED: True, The context specified that being in a retry period
was permitted.
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance:
smart_sensor_group_shard_0.smart_sensor_task
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Trigger
Rule' PASSED: True, The task instance did not have any upstream tasks.
[2021-10-20 07:30:43,020] {taskinstance.py:1035} DEBUG - Dependencies all
met for <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]>
[2021-10-20 07:30:43,025] {scheduler_job.py:603} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 587, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 668, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 758, in _do_scheduling
self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1024, in _send_dag_callbacks_to_processor
dag = dag_run.get_dag()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py",
line 397, in get_dag
raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun
ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00:
scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be
set
[2021-10-20 07:30:43,026] {kubernetes_executor.py:787} INFO - Shutting down
Kubernetes executor
[2021-10-20 07:30:43,026] {kubernetes_executor.py:788} DEBUG - Flushing
task_queue...
[2021-10-20 07:30:43,027] {kubernetes_executor.py:742} DEBUG - Executor
shutting down, task_queue approximate size=0
[2021-10-20 07:30:43,027] {kubernetes_executor.py:790} DEBUG - Flushing
result_queue...
[2021-10-20 07:30:43,027] {kubernetes_executor.py:755} DEBUG - Executor
shutting down, result_queue approximate size=0
[2021-10-20 07:30:43,028] {kubernetes_executor.py:395} DEBUG - Terminating
kube_watcher...
[2021-10-20 07:30:43,033] {kubernetes_executor.py:398} DEBUG -
kube_watcher=<KubernetesJobWatcher(KubernetesJobWatcher-3, stopped)>
[2021-10-20 07:30:43,033] {kubernetes_executor.py:399} DEBUG - Flushing
watcher_queue...
[2021-10-20 07:30:43,034] {kubernetes_executor.py:383} DEBUG - Executor
shutting down, watcher_queue approx. size=0
[2021-10-20 07:30:43,034] {kubernetes_executor.py:403} DEBUG - Shutting down
manager...
[2021-10-20 07:30:43,069] {dagcode.py:132} DEBUG - Deleting code from
dag_code table
[2021-10-20 07:30:44,045] {process_utils.py:100} INFO - Sending
Signals.SIGTERM to GPID 39
[2021-10-20 07:30:45,074] {settings.py:331} DEBUG - Disposing DB connection
pool (PID 52)
[2021-10-20 07:30:45,087] {process_utils.py:66} INFO - Process
psutil.Process(pid=52, status='terminated', started='07:30:42') (52) terminated
with exit code None
[2021-10-20 07:30:45,491] {settings.py:331} DEBUG - Disposing DB connection
pool (PID 53)
[2021-10-20 07:30:45,514] {process_utils.py:66} INFO - Process
psutil.Process(pid=53, status='terminated', started='07:30:42') (53) terminated
with exit code None
[2021-10-20 07:30:45,883] {settings.py:331} DEBUG - Disposing DB connection
pool (PID 54)
[2021-10-20 07:30:45,890] {process_utils.py:212} INFO - Waiting up to 5
seconds for processes to exit...
[2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process
psutil.Process(pid=54, status='terminated', started='07:30:42') (54) terminated
with exit code None
[2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process
psutil.Process(pid=39, status='terminated', exitcode=0, started='07:30:42')
(39) terminated with exit code 0
[2021-10-20 07:30:45,900] {scheduler_job.py:614} INFO - Exited execute loop
[2021-10-20 07:30:45,910] {cli_action_loggers.py:84} DEBUG - Calling
callbacks: []
[2021-10-20 07:30:45,913] {settings.py:331} DEBUG - Disposing DB connection
pool (PID 8)
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line
40, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line
92, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
line 75, in scheduler
_run_scheduler_job(args=args)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 587, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 668, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 758, in _do_scheduling
self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1024, in _send_dag_callbacks_to_processor
dag = dag_run.get_dag()
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py",
line 397, in get_dag
raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun
ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00:
scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be
set
```
At this stage all the dag_runs which are running will cause this issue:
Scheduler can be resurrected by deleting *all* dag_runs which are in running
state:
```delete from dag_run where state='running' and
dag_id='ecicjc_query_table_16';```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]