kiros19 commented on issue #49966:
URL: https://github.com/apache/airflow/issues/49966#issuecomment-3845726832

   I've got the same error on setup: 
   Airlfow 3.1.6 + apache-airflow-providers-fab 3.1.2 (trying to solve 
apiserver UI not opening after some time) 
   CeleryExecutor + rabbitMQ:3-management
   Docker
   
   I've seen the error before, but i believe some reloads, version changes made 
it dissapear someway
   The problem as i see it:
   Celery has some buit-in log formatting setting, that's failing
   That's causing worker to close AMQP connection to rabbit
   
   I believe that logging config is built-in as my config is default and has 
'taskname' everywhere, not task_name
   And logs were not modified from scripts or anything, it's clean
   
   The №1 sus thing is that airflow jobs still executed ok. Nothing is failing. 
   But how could it be ok, if AMQP is closed instanly on opening, due to that 
celery logging error?
   
   Other sus thing - Error is not appearing for my prod setup:
   Airlfow 3.1.5 + airflow-providers-fab 3.0.3
   And I'm not sure I've seen it for my prev test setup (not 100% sure here):
   Airlfow 3.1.6 + apache-airflow-providers-fab 3.1.1
   
   Below are my rabbit,celery logs and my logging config
   
   RABBITMQ:
   --ok
   2026-02-04 06:46:26.518060+00:00 [info] <0.3105.0> accepting AMQP connection 
<0.3105.0> (172.18.0.4:46058 -> 172.18.0.2:5672)
   2026-02-04 06:46:26.520354+00:00 [info] <0.3105.0> connection <0.3105.0> 
(172.18.0.4:46058 -> 172.18.0.2:5672): user 'airflow' authenticated and granted 
access to vhost 'airflow_queue'
   2026-02-04 06:46:26.528031+00:00 [info] <0.3122.0> accepting AMQP connection 
<0.3122.0> (172.18.0.4:46064 -> 172.18.0.2:5672)
   2026-02-04 06:46:26.529746+00:00 [info] <0.3122.0> connection <0.3122.0> 
(172.18.0.4:46064 -> 172.18.0.2:5672): user 'airflow' authenticated and granted 
access to vhost 'airflow_queue'
   --instanlty closing
   2026-02-04 06:46:26.922223+00:00 [warning] <0.3105.0> closing AMQP 
connection <0.3105.0> (172.18.0.4:46058 -> 172.18.0.2:5672, vhost: 
'airflow_queue', user: 'airflow'):
   2026-02-04 06:46:26.922223+00:00 [warning] <0.3105.0> client unexpectedly 
closed TCP connection
   2026-02-04 06:46:26.922550+00:00 [warning] <0.3122.0> closing AMQP 
connection <0.3122.0> (172.18.0.4:46064 -> 172.18.0.2:5672, vhost: 
'airflow_queue', user: 'airflow'):
   2026-02-04 06:46:26.922550+00:00 [warning] <0.3122.0> client unexpectedly 
closed TCP connection
   
   WORKER:
   --- Logging error ---
   Traceback (most recent call last):
     File "/usr/python/lib/python3.12/logging/__init__.py", line 464, in format
       return self._format(record)
              ^^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/logging/__init__.py", line 460, in _format
       return self._fmt % values
              ~~~~~~~~~~^~~~~~~~
   KeyError: 'task_name'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/python/lib/python3.12/logging/__init__.py", line 1160, in emit
       msg = self.format(record)
             ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/logging/__init__.py", line 999, in format
       return fmt.format(record)
              ^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/logging/__init__.py", line 706, in format
       s = self.formatMessage(record)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/logging/__init__.py", line 675, in 
formatMessage
       return self._style.format(record)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/logging/__init__.py", line 466, in format
       raise ValueError('Formatting field not found in record: %s' % e)
   ValueError: Formatting field not found in record: 'task_name'
   Call stack:
     File "/home/airflow/.local/bin/airflow", line 7, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
 line 66, in wrapper
       providers_configuration_loaded(func)(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
 line 293, in worker
       _run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
 line 52, in _run_command_with_daemon_option
       run_command_with_daemon_option(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/cli/celery_command.py",
 line 286, in run_celery_worker
       celery_app.worker_main(options)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py", line 
487, in worker_main
       self.start(argv=argv)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py", line 
467, in start
       celery.main(args=argv, standalone_mode=False)
     File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", 
line 1406, in main
       rv = self.invoke(ctx)
     File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", 
line 1873, in invoke
       return _process_result(sub_ctx.command.invoke(sub_ctx))
     File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", 
line 1269, in invoke
       return ctx.invoke(self.callback, **ctx.params)
     File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", 
line 824, in invoke
       return callback(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/click/decorators.py", line 
34, in new_func
       return f(get_current_context(), *args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/base.py", line 
135, in caller
       return f(ctx, *args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/worker.py", line 
367, in worker
       worker.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/worker.py", 
line 203, in start
       self.blueprint.start(self)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line 
116, in start
       step.start(parent)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line 
365, in start
       return self.obj.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
 line 346, in start
       blueprint.start(self)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/bootsteps.py", line 
116, in start
       step.start(parent)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
 line 788, in start
       c.loop(*c.loop_args())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/loops.py", 
line 96, in asynloop
       next(loop)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/asynchronous/hub.py", 
line 310, in create_loop
       item()
     File "/home/airflow/.local/lib/python3.12/site-packages/vine/promises.py", 
line 158, in __call__
       retval = fun(*final_args, **final_kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/base.py", 
line 230, in _read
       drain_events(timeout=0)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line 
526, in drain_events
       while not self.blocking_read(timeout):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line 
532, in blocking_read
       return self.on_inbound_frame(frame)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/amqp/method_framing.py", 
line 77, in on_frame
       callback(channel, msg.frame_method, msg.frame_args, msg)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/amqp/connection.py", line 
538, in on_inbound_method
       return self.channels[channel_id].dispatch_method(
   
     File 
"/home/airflow/.local/lib/python3.12/site-packages/amqp/abstract_channel.py", 
line 156, in dispatch_method
   
       listener(*args)
     File "/home/airflow/.local/lib/python3.12/site-packages/amqp/channel.py", 
line 1629, in _on_basic_deliver
       fun(msg)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py", line 
668, in _receive_callback
       return on_m(message) if on_m else self.receive(decoded, message)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/consumer/consumer.py",
 line 694, in on_task_received
       strategy(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/strategy.py", 
line 207, in task_message_handler
       [callback(req) for callback in callbacks]
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/worker/autoscale.py", 
line 96, in maybe_scale
       self.pool.maintain_pool()
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 1351, in maintain_pool
       self._maintain_pool()
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 1343, in _maintain_pool
       self._repopulate_pool(joined)
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 1328, in _repopulate_pool
       self._create_worker_process(self._avail_index())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/concurrency/asynpool.py",
 line 493, in _create_worker_process
       return super()._create_worker_process(i)
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 1158, in _create_worker_process
       w.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line 
120, in start
       self._popen = self._Popen(self)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/context.py", line 
331, in _Popen
       return Popen(process_obj)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/popen_fork.py", 
line 22, in __init__
       self._launch(process_obj)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/popen_fork.py", 
line 77, in _launch
       code = process_obj._bootstrap()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line 
323, in _bootstrap
       self.run()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/billiard/process.py", line 
110, in run
       self._target(*self._args, **self._kwargs)
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 292, in __call__
       sys.exit(self.workloop(pid=pid))
     File "/home/airflow/.local/lib/python3.12/site-packages/billiard/pool.py", 
line 362, in workloop
       result = (True, prepare_result(fun(*args, **kwargs)))
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 
694, in fast_trace_task
       R, I, T, Rstr = tasks[task].__trace__(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 
479, in trace_task
       R = retval = fun(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 
779, in __protected_call__
       return self.run(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 154, in execute_workload
       log.info("[%s] Executing workload in Celery: %s", celery_task_id, 
workload)
   Message: '[%s] Executing workload in Celery: %s'
   
   CONFIG:
   [logging]
   base_log_folder = /opt/airflow/logs
   remote_logging = False
   remote_log_conn_id =
   delete_local_logs = False
   google_key_path =
   remote_base_log_folder =
   remote_task_handler_kwargs =
   encrypt_s3_logs = False
   logging_level = INFO
   celery_logging_level = WARNING
   fab_logging_level = WARNING
   logging_config_class =
   colored_console_log = True
   #log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - 
%(message)s
   log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - 
%%(message)s
   #log_format = %(asctime)s [%(levelname)s] %(message)s
   callsite_parameters = filename,lineno
   #simple_log_format = %(asctime)s %(levelname)s - %(message)s
   simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
   dag_processor_log_target = file
   dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] 
{%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
   dag_processor_child_process_log_directory = /opt/airflow/logs/dag_processor
   log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
   secret_mask_adapter =
   min_length_masked_secret = 5
   task_log_prefix_template =
   log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id 
}}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ 
ti.map_index }}/{%% endif %%}attempt={{ try_number|default(ti.try_number) }}.log
   task_log_reader = task
   extra_logger_names =
   worker_log_server_port = 8793
   trigger_log_server_port = 8794
   # interleave_timestamp_parser =
   file_task_handler_new_folder_permissions = 0o775
   file_task_handler_new_file_permissions = 0o664
   celery_stdout_stderr_separation = False
   color_log_error_keywords = error,exception
   color_log_warning_keywords = warn
   [celery_kubernetes_executor]
   kubernetes_queue = kubernetes
   [celery]
   celery_app_name = airflow.providers.celery.executors.celery_executor
   worker_concurrency = 16
   # worker_autoscale =
   worker_prefetch_multiplier = 1
   worker_enable_remote_control = true
   broker_url = pyamqp://airflow_mcfm:3hJflp5jVSHm@rabbitmq/mcfm
   result_backend = 
db+postgresql://airflow_mcfm:[email protected]:15432/airflow_mcfm
   result_backend_sqlalchemy_engine_options =
   flower_host = 0.0.0.0
   flower_url_prefix =
   flower_port = 5555
   flower_basic_auth =
   sync_parallelism = 0
   celery_config_options = 
airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
   ssl_active = False
   ssl_key =
   ssl_cert =
   ssl_cacert =
   pool = prefork
   operation_timeout = 1.0
   task_acks_late = True
   task_track_started = True
   task_publish_max_retries = 3
   extra_celery_config = {}
   #extra_celery_config = {"global_qos": false} -- False должно быть, как в 
питоне, не false. Но настройка фолс по дефолту
   #broker_transport_options = {"global_qos": false}
   # worker_umask =
   [celery_broker_transport_options]
   # visibility_timeout =
   # sentinel_kwargs =
   
   And here is everything i've got on logging in docker compose (no celery 
logging format related stuff)
       AIRFLOW__LOGGING__BASE_LOG_FOLDER: '/opt/airflow/logs'
       AIRFLOW__LOGGING__DAG_PROCESSOR_CHILD_PROCESS_LOG_DIRECTORY: 
'/opt/airflow/logs/dag_processor_logs'
       AIRFLOW__LOGGING__REMOTE_LOGGING: 'true'
       AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: 's3://dev/logs'
       AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: 'minio_s3_conn'
       AIRFLOW__LOGGING__DELETE_LOCAL_LOGS: 'true'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to