nicolamarangoni commented on issue #25343:
URL: https://github.com/apache/airflow/issues/25343#issuecomment-1202095399
It looks like the necessary condition for this serialisation error is just
the separation of scheduler and dag-processor in 2 different containers, the
CeleryExecutor is not necessary to reproduce the error.
However, without Celery the error is much rarer and restarting all services
fixes it.
On the contrary, with Celery, restarting the services doesn't fix this issue.
```
[2022-08-02 08:24:24,696] {scheduler_job.py:1381} WARNING - Failing (3) jobs
without heartbeat after 2022-08-02 05:54:24.648908+00:00
[2022-08-02 08:24:24,696] {scheduler_job.py:1389} ERROR - Detected zombie
job: {'full_filepath': '/data/dags/01_sources/adobe/adobe_dag.py', 'msg':
'Detected <TaskInstance: adobe_dag.adobe.prescript
scheduled__2022-08-01T05:00:00+00:00 [running]> as zombie',
'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object
at 0x7effb1585060>, 'is_failure_callback': True}
[2022-08-02 08:24:24,706] {scheduler_job.py:769} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 752, in _execute
self._run_scheduler_loop()
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 873, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/usr/lib/python3.10/sched.py", line 151, in run
action(*argument, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py",
line 36, in repeat
action(*args, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line
71, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 1390, in _find_zombies
self.executor.send_callback(request)
File
"/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line
363, in send_callback
self.callback_sink.send(request)
File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line
71, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py",
line 34, in send
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
File "<string>", line 4, in __init__
File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line
481, in _initialize_instance
with util.safe_reraise():
File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line
208, in raise_
raise exception
File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line
479, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
File
"/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line
44, in __init__
self.callback_data = callback.to_json()
File
"/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py",
line 79, in to_json
return json.dumps(dict_obj)
File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/lib/python3.10/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable
[2022-08-02 08:24:24,710] {kubernetes_executor.py:821} INFO - Shutting down
Kubernetes executor
[2022-08-02 08:24:24,710] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='refresh_gfk_d_5_min_dag', task_id='check.status',
run_id='manual__2022-08-02T04:00:00+00:00', try_number=1, map_index=-1),
['airflow', 'tasks', 'run', 'refresh_gfk_d_5_min_dag', 'check.status',
'manual__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/dwh_refresh_generator_dag.py'], None, None)
[2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='refresh_gfk_d_tagesslots_dag',
task_id='check.status', run_id='manual__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'refresh_gfk_d_tagesslots_dag', 'check.status',
'manual__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/dwh_refresh_generator_dag.py'], None, None)
[2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='check_gfk_d_tagesslots_dag',
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'check_gfk_d_tagesslots_dag', 'wait.until.finished',
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
[2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='check_gfk_d_agf_slots_dag',
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'check_gfk_d_agf_slots_dag', 'wait.until.finished',
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
[2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='check_gfk_d_5_min_dag',
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'check_gfk_d_5_min_dag', 'wait.until.finished',
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
[2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='check_gfk_d_ausstrahlungen_dag',
task_id='finished', run_id='scheduled__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'check_gfk_d_ausstrahlungen_dag', 'finished',
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
[2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run
task=(TaskInstanceKey(dag_id='check_gfk_d_sendungsteile_dag',
task_id='finished', run_id='scheduled__2022-08-02T04:00:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run',
'check_gfk_d_sendungsteile_dag', 'finished',
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/01_sources/gfk_d_sendungsteile/gfk_d_sendungsteile.py'], None,
None)
[2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor
shutting down, will NOT run task=(TaskInstanceKey(dag_id='freewheel_dag',
task_id='check.prescript', run_id='scheduled__2022-08-02T05:20:00+00:00',
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 'freewheel_dag',
'check.prescript', 'scheduled__2022-08-02T05:20:00+00:00', '--local',
'--subdir', 'DAGS_FOLDER/01_sources/freewheel/freewheel_dag.py'], None, None)
[2022-08-02 08:24:24,747] {scheduler_job.py:781} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/lib/python3.10/site-packages/airflow/__main__.py", line 38, in
main
args.func(args)
File "/usr/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line
51, in command
return func(*args, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in
wrapper
return f(*args, **kwargs)
File
"/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 75, in scheduler
_run_scheduler_job(args=args)
File
"/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 46, in _run_scheduler_job
job.run()
File "/usr/lib/python3.10/site-packages/airflow/jobs/base_job.py", line
244, in run
self._execute()
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 752, in _execute
self._run_scheduler_loop()
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 873, in _run_scheduler_loop
next_event = timers.run(blocking=False)
File "/usr/lib/python3.10/sched.py", line 151, in run
action(*argument, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py",
line 36, in repeat
action(*args, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line
71, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
line 1390, in _find_zombies
self.executor.send_callback(request)
File
"/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line
363, in send_callback
self.callback_sink.send(request)
File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line
71, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py",
line 34, in send
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
File "<string>", line 4, in __init__
File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line
481, in _initialize_instance
with util.safe_reraise():
File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line
208, in raise_
raise exception
File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line
479, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
File
"/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line
44, in __init__
self.callback_data = callback.to_json()
File
"/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py",
line 79, in to_json
return json.dumps(dict_obj)
File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/lib/python3.10/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]