anmol commented on issue #18843:
URL: https://github.com/apache/airflow/issues/18843#issuecomment-947419133


   Not sure if this qualifies as a separate bug but I think it might be related 
- facing after Upgraded from 2.1.2 - > 2.2.0.
   airflow version: 2.2.0
   
   Deployment details
   - AWS EKS over own helm chart
   - KubernetesExecutor
   
   How to Reproduce:  Restart (rollout) Scheduler when a task is running for a 
dag_id = dag_1, dag_run_1
   
   Error: Scheduler goes in CrashLoopBackOff with error - 
   ```
   {scheduler_job.py:952} ERROR - Couldn't find dag dag_1 in DagBag/DB!
   ...
   airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun dag_run_1> 
needs to be set
   ```
   
   Full Stacktrace for an actual Dag: 
   
   ```
   [2021-10-20 07:30:42,955] {retries.py:82} DEBUG - Running 
SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
   [2021-10-20 07:30:42,972] {retries.py:82} DEBUG - Running 
SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
   [2021-10-20 07:30:42,974] {serialized_dag.py:205} DEBUG - Deleting 
Serialized DAGs (for which DAG files are deleted) from serialized_dag table
   [2021-10-20 07:30:42,982] {dag.py:2850} DEBUG - Deactivating DAGs (for which 
DAG files are deleted) from dag table
   [2021-10-20 07:30:42,990] {retries.py:82} DEBUG - Running 
SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
   [2021-10-20 07:30:43,000] {scheduler_job.py:952} ERROR - Couldn't find dag 
ecicjc_query_table_16 in DagBag/DB!
   [2021-10-20 07:30:43,014] {scheduler_job.py:1007} DEBUG - DAG 
smart_sensor_group_shard_0 not changed structure, skipping 
dagrun.verify_integrity
   [2021-10-20 07:30:43,019] {dagrun.py:543} DEBUG - number of tis tasks for 
<DagRun smart_sensor_group_shard_0 @ 2021-10-06 06:23:28.320990+00:00: 
scheduled__2021-10-06T06:23:28.320990+00:00, externally triggered: False>: 1 
task(s)
   [2021-10-20 07:30:43,019] {dagrun.py:558} DEBUG - number of scheduleable 
tasks for <DagRun smart_sensor_group_shard_0 @ 2021-10-06 
06:23:28.320990+00:00: scheduled__2021-10-06T06:23:28.320990+00:00, externally 
triggered: False>: 0 task(s)
   [2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: 
smart_sensor_group_shard_0.smart_sensor_task 
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Previous 
Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: 
smart_sensor_group_shard_0.smart_sensor_task 
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Not In 
Retry Period' PASSED: True, The context specified that being in a retry period 
was permitted.
   [2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: 
smart_sensor_group_shard_0.smart_sensor_task 
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Trigger 
Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2021-10-20 07:30:43,020] {taskinstance.py:1035} DEBUG - Dependencies all 
met for <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task 
scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]>
   [2021-10-20 07:30:43,025] {scheduler_job.py:603} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 587, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 668, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 758, in _do_scheduling
       self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1024, in _send_dag_callbacks_to_processor
       dag = dag_run.get_dag()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 397, in get_dag
       raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
   airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun 
ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00: 
scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be 
set
   [2021-10-20 07:30:43,026] {kubernetes_executor.py:787} INFO - Shutting down 
Kubernetes executor
   [2021-10-20 07:30:43,026] {kubernetes_executor.py:788} DEBUG - Flushing 
task_queue...
   [2021-10-20 07:30:43,027] {kubernetes_executor.py:742} DEBUG - Executor 
shutting down, task_queue approximate size=0
   [2021-10-20 07:30:43,027] {kubernetes_executor.py:790} DEBUG - Flushing 
result_queue...
   [2021-10-20 07:30:43,027] {kubernetes_executor.py:755} DEBUG - Executor 
shutting down, result_queue approximate size=0
   [2021-10-20 07:30:43,028] {kubernetes_executor.py:395} DEBUG - Terminating 
kube_watcher...
   [2021-10-20 07:30:43,033] {kubernetes_executor.py:398} DEBUG - 
kube_watcher=<KubernetesJobWatcher(KubernetesJobWatcher-3, stopped)>
   [2021-10-20 07:30:43,033] {kubernetes_executor.py:399} DEBUG - Flushing 
watcher_queue...
   [2021-10-20 07:30:43,034] {kubernetes_executor.py:383} DEBUG - Executor 
shutting down, watcher_queue approx. size=0
   [2021-10-20 07:30:43,034] {kubernetes_executor.py:403} DEBUG - Shutting down 
manager...
   [2021-10-20 07:30:43,069] {dagcode.py:132} DEBUG - Deleting code from 
dag_code table
   [2021-10-20 07:30:44,045] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 39
   [2021-10-20 07:30:45,074] {settings.py:331} DEBUG - Disposing DB connection 
pool (PID 52)
   [2021-10-20 07:30:45,087] {process_utils.py:66} INFO - Process 
psutil.Process(pid=52, status='terminated', started='07:30:42') (52) terminated 
with exit code None
   [2021-10-20 07:30:45,491] {settings.py:331} DEBUG - Disposing DB connection 
pool (PID 53)
   [2021-10-20 07:30:45,514] {process_utils.py:66} INFO - Process 
psutil.Process(pid=53, status='terminated', started='07:30:42') (53) terminated 
with exit code None
   [2021-10-20 07:30:45,883] {settings.py:331} DEBUG - Disposing DB connection 
pool (PID 54)
   [2021-10-20 07:30:45,890] {process_utils.py:212} INFO - Waiting up to 5 
seconds for processes to exit...
   [2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process 
psutil.Process(pid=54, status='terminated', started='07:30:42') (54) terminated 
with exit code None
   [2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process 
psutil.Process(pid=39, status='terminated', exitcode=0, started='07:30:42') 
(39) terminated with exit code 0
   [2021-10-20 07:30:45,900] {scheduler_job.py:614} INFO - Exited execute loop
   [2021-10-20 07:30:45,910] {cli_action_loggers.py:84} DEBUG - Calling 
callbacks: []
   [2021-10-20 07:30:45,913] {settings.py:331} DEBUG - Disposing DB connection 
pool (PID 8)
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 
40, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 75, in scheduler
       _run_scheduler_job(args=args)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 46, in _run_scheduler_job
       job.run()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", 
line 245, in run
       self._execute()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 587, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 668, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 758, in _do_scheduling
       self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1024, in _send_dag_callbacks_to_processor
       dag = dag_run.get_dag()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 397, in get_dag
       raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
   airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun 
ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00: 
scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be 
set
   ```
   
   At this stage all the dag_runs which are running will cause this issue:
   
   Scheduler can be resurrected by deleting *all* dag_runs which are in running 
state:
   
   ```delete from dag_run where state='running' and 
dag_id='ecicjc_query_table_16';``` 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to