humit0 opened a new issue #19671:
URL: https://github.com/apache/airflow/issues/19671
### Apache Airflow version
2.2.0
### Operating System
CentOS 7
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other
### Deployment details
_No response_
### What happened
Scheduler process is killed with errors.
```
[2021-11-18 11:41:49,826] {scheduler_job.py:603} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 587, in _execute
self._run_scheduler_loop()
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 680, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/opt/rh/rh-python38/root/usr/lib64/python3.8/sched.py", line 151, in
run
action(*argument, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/utils/event_scheduler.py",
line 36, in repeat
action(*args, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/utils/session.py", line
70, in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 1069, in adopt_or_reset_orphaned_tasks
for attempt in run_with_db_retries(logger=self.log):
File "/opt/app-root/lib64/python3.8/site-packages/tenacity/__init__.py",
line 382, in __iter__
do = self.iter(retry_state=retry_state)
File "/opt/app-root/lib64/python3.8/site-packages/tenacity/__init__.py",
line 349, in iter
return fut.result()
File
"/opt/rh/rh-python38/root/usr/lib64/python3.8/concurrent/futures/_base.py",
line 432, in result
return self.__get_result()
File
"/opt/rh/rh-python38/root/usr/lib64/python3.8/concurrent/futures/_base.py",
line 388, in __get_result
raise self._exception
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 1113, in adopt_or_reset_orphaned_tasks
to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/executors/celery_executor.py",
line 485, in try_adopt_task_instances
self.adopted_task_timeouts[ti.key] = ti.queued_dttm +
self.task_adoption_timeout
TypeError: unsupported operand type(s) for +: 'NoneType' and
'datetime.timedelta'
[2021-11-18 11:41:50,850] {process_utils.py:100} INFO - Sending
Signals.SIGTERM to GPID 41
[2021-11-18 11:41:51,103] {process_utils.py:66} INFO - Process
psutil.Process(pid=41, status='terminated', exitcode=0, started='11:00:42')
(41) terminated with exit code 0
[2021-11-18 11:41:51,104] {scheduler_job.py:614} INFO - Exited execute loop
Traceback (most recent call last):
File "/opt/app-root/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/app-root/lib64/python3.8/site-packages/airflow/__main__.py",
line 40, in main
args.func(args)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/cli_parser.py", line
48, in command
return func(*args, **kwargs)
File "/opt/app-root/lib64/python3.8/site-packages/airflow/utils/cli.py",
line 92, in wrapper
return f(*args, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 75, in scheduler
_run_scheduler_job(args=args)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/base_job.py", line
245, in run
self._execute()
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 587, in _execute
self._run_scheduler_loop()
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 680, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/opt/rh/rh-python38/root/usr/lib64/python3.8/sched.py", line 151, in
run
action(*argument, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/utils/event_scheduler.py",
line 36, in repeat
action(*args, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/utils/session.py", line
70, in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 1069, in adopt_or_reset_orphaned_tasks
for attempt in run_with_db_retries(logger=self.log):
File "/opt/app-root/lib64/python3.8/site-packages/tenacity/__init__.py",
line 382, in __iter__
do = self.iter(retry_state=retry_state)
File "/opt/app-root/lib64/python3.8/site-packages/tenacity/__init__.py",
line 349, in iter
return fut.result()
File
"/opt/rh/rh-python38/root/usr/lib64/python3.8/concurrent/futures/_base.py",
line 432, in result
return self.__get_result()
File
"/opt/rh/rh-python38/root/usr/lib64/python3.8/concurrent/futures/_base.py",
line 388, in __get_result
raise self._exception
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/scheduler_job.py",
line 1113, in adopt_or_reset_orphaned_tasks
to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/executors/celery_executor.py",
line 485, in try_adopt_task_instances
self.adopted_task_timeouts[ti.key] = ti.queued_dttm +
self.task_adoption_timeout
TypeError: unsupported operand type(s) for +: 'NoneType' and
'datetime.timedelta'
```
### What you expected to happen
_No response_
### How to reproduce
Environment Variables
```bash
export AIRFLOW_HOME="/home/deploy/bi-airflow"
export AIRFLOW__CORE__SQL_ALCHEMY_CONN="mysql://****/airflow?charset=utf8mb4"
export AIRFLOW__CORE__LOAD_EXAMPLES="False"
export AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS="False"
export AIRFLOW__CORE__DEFAULT_TIMEZONE="Asia/Seoul"
export AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE="Asia/Seoul"
export AIRFLOW__API__AUTH_BACKEND="airflow.api.auth.backend.basic_auth"
export AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE="True"
export AIRFLOW__CELERY__BROKER_URL="pyamqp://******/airflow"
export
AIRFLOW__CELERY__RESULT_BACKEND="db+mysql://*****/airflow?charset=utf8mb4"
export AIRFLOW__CELERY__FLOWER_URL_PREFIX="/flower"
export AIRFLOW__CORE__FERNET_KEY="*******"
# Required parts for reproduce!
export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
export AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL="30.0"
export AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL="10"
```
Dag files
```python
from datetime import datetime, timedelta
import time
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "deploy",
"start_date": datetime(2021, 10, 1),
"end_date": datetime(2021, 10, 25),
"retries": 2,
"retry_delay": timedelta(minutes=20),
"depends_on_past": False,
}
def sleep_func(t: float):
print(f"sleeping {t} sec...")
time.sleep(t)
print("sleep end...")
with DAG(
dag_id="active_run_test",
default_args=default_args,
schedule_interval="5 0 * * *",
max_active_runs=32,
catchup=True,
) as dag:
t1 = PythonOperator(task_id="sleep_task1", python_callable=sleep_func,
op_kwargs={"t": 120})
t2 = PythonOperator(task_id="sleep_task2", python_callable=sleep_func,
op_kwargs={"t": 70})
t3 = PythonOperator(task_id="sleep_task3", python_callable=sleep_func,
op_kwargs={"t": 80})
t1 >> t2 >> t3
```
1. Go to `active_run_test` dag site.
2. Turn on `active_run_test` dag
3. Run `sleep_task1` task (which run_id:
scheduled__2021-10-22T15:05:00+00:00) with "Ignore All Deps"
When the state of `sleep_task1` task changes `running`, scheduler processer
raise exception at `adopt_or_reset_orphaned_tasks` method.
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]