nicolamarangoni commented on issue #25343:
URL: https://github.com/apache/airflow/issues/25343#issuecomment-1202095399

   It looks like the necessary condition for this serialisation error is just 
the separation of scheduler and dag-processor in 2 different containers, the 
CeleryExecutor is not necessary to reproduce the error.
   However, without Celery the error is much rarer and restarting all services 
fixes it.
   On the contrary, with Celery, restarting the services doesn't fix this issue.
   
   ```
   [2022-08-02 08:24:24,696] {scheduler_job.py:1381} WARNING - Failing (3) jobs 
without heartbeat after 2022-08-02 05:54:24.648908+00:00
   [2022-08-02 08:24:24,696] {scheduler_job.py:1389} ERROR - Detected zombie 
job: {'full_filepath': '/data/dags/01_sources/adobe/adobe_dag.py', 'msg': 
'Detected <TaskInstance: adobe_dag.adobe.prescript 
scheduled__2022-08-01T05:00:00+00:00 [running]> as zombie', 
'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object 
at 0x7effb1585060>, 'is_failure_callback': True}
   [2022-08-02 08:24:24,706] {scheduler_job.py:769} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 752, in _execute
       self._run_scheduler_loop()
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 873, in _run_scheduler_loop
       next_event = timers.run(blocking=False)
     File "/usr/lib/python3.10/sched.py", line 151, in run
       action(*argument, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", 
line 36, in repeat
       action(*args, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 
71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 1390, in _find_zombies
       self.executor.send_callback(request)
     File 
"/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 
363, in send_callback
       self.callback_sink.send(request)
     File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 
71, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py",
 line 34, in send
       db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
     File "<string>", line 4, in __init__
     File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 
481, in _initialize_instance
       with util.safe_reraise():
     File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", 
line 70, in __exit__
       compat.raise_(
     File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 
208, in raise_
       raise exception
     File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 
479, in _initialize_instance
       return manager.original_init(*mixed[1:], **kwargs)
     File 
"/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line 
44, in __init__
       self.callback_data = callback.to_json()
     File 
"/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py", 
line 79, in to_json
       return json.dumps(dict_obj)
     File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/lib/python3.10/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type datetime is not JSON serializable
   [2022-08-02 08:24:24,710] {kubernetes_executor.py:821} INFO - Shutting down 
Kubernetes executor
   [2022-08-02 08:24:24,710] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='refresh_gfk_d_5_min_dag', task_id='check.status', 
run_id='manual__2022-08-02T04:00:00+00:00', try_number=1, map_index=-1), 
['airflow', 'tasks', 'run', 'refresh_gfk_d_5_min_dag', 'check.status', 
'manual__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/dwh_refresh_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='refresh_gfk_d_tagesslots_dag', 
task_id='check.status', run_id='manual__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'refresh_gfk_d_tagesslots_dag', 'check.status', 
'manual__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/dwh_refresh_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='check_gfk_d_tagesslots_dag', 
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'check_gfk_d_tagesslots_dag', 'wait.until.finished', 
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='check_gfk_d_agf_slots_dag', 
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'check_gfk_d_agf_slots_dag', 'wait.until.finished', 
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,711] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='check_gfk_d_5_min_dag', 
task_id='wait.until.finished', run_id='scheduled__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'check_gfk_d_5_min_dag', 'wait.until.finished', 
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='check_gfk_d_ausstrahlungen_dag', 
task_id='finished', run_id='scheduled__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'check_gfk_d_ausstrahlungen_dag', 'finished', 
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/05_generators/check_deliveries_generator_dag.py'], None, None)
   [2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='check_gfk_d_sendungsteile_dag', 
task_id='finished', run_id='scheduled__2022-08-02T04:00:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 
'check_gfk_d_sendungsteile_dag', 'finished', 
'scheduled__2022-08-02T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/01_sources/gfk_d_sendungsteile/gfk_d_sendungsteile.py'], None, 
None)
   [2022-08-02 08:24:24,712] {kubernetes_executor.py:781} WARNING - Executor 
shutting down, will NOT run task=(TaskInstanceKey(dag_id='freewheel_dag', 
task_id='check.prescript', run_id='scheduled__2022-08-02T05:20:00+00:00', 
try_number=1, map_index=-1), ['airflow', 'tasks', 'run', 'freewheel_dag', 
'check.prescript', 'scheduled__2022-08-02T05:20:00+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/01_sources/freewheel/freewheel_dag.py'], None, None)
   [2022-08-02 08:24:24,747] {scheduler_job.py:781} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/lib/python3.10/site-packages/airflow/__main__.py", line 38, in 
main
       args.func(args)
     File "/usr/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 
51, in command
       return func(*args, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in 
wrapper
       return f(*args, **kwargs)
     File 
"/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", 
line 75, in scheduler
       _run_scheduler_job(args=args)
     File 
"/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", 
line 46, in _run_scheduler_job
       job.run()
     File "/usr/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 
244, in run
       self._execute()
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 752, in _execute
       self._run_scheduler_loop()
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 873, in _run_scheduler_loop
       next_event = timers.run(blocking=False)
     File "/usr/lib/python3.10/sched.py", line 151, in run
       action(*argument, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", 
line 36, in repeat
       action(*args, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 
71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", 
line 1390, in _find_zombies
       self.executor.send_callback(request)
     File 
"/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 
363, in send_callback
       self.callback_sink.send(request)
     File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 
71, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py",
 line 34, in send
       db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
     File "<string>", line 4, in __init__
     File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 
481, in _initialize_instance
       with util.safe_reraise():
     File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", 
line 70, in __exit__
       compat.raise_(
     File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 
208, in raise_
       raise exception
     File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 
479, in _initialize_instance
       return manager.original_init(*mixed[1:], **kwargs)
     File 
"/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line 
44, in __init__
       self.callback_data = callback.to_json()
     File 
"/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py", 
line 79, in to_json
       return json.dumps(dict_obj)
     File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/lib/python3.10/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type datetime is not JSON serializable
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to