EtsuNDmA opened a new issue #17336:
URL: https://github.com/apache/airflow/issues/17336
- **Apache Airflow version**: 2.1.2
- **Python**: Python 3.8.5
- **CeleryExecutor**
**Environment**:
- **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
- **Kernel** (e.g. `uname -a`): Linux 074f2dc34905 5.10.25-linuxkit
**What happened**:
We use remote logging to elasticsearch. After upgrade from 1.10.15 to 2.1.2
celery worker fails on each task when the `logging.log_format` contains
`asctime` but the `elasticsearch.json_fields` don't
Worker logs:
```
[2021-07-30 09:29:04,120: ERROR/ForkPoolWorker-1] Failed to execute task
maximum recursion depth exceeded while calling a Python object.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/logging/__init__.py", line 436, in format
return self._format(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 432, in _format
return self._fmt % record.__dict__
KeyError: 'asctime'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/logging/__init__.py", line 1081, in emit
msg = self.format(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 925, in format
return fmt.format(record)
File
"/airflow/env/lib/python3.8/site-packages/airflow/utils/log/json_formatter.py",
line 43, in format
super().format(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 667, in format
s = self.formatMessage(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 636, in
formatMessage
return self._style.format(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 438, in format
raise ValueError('Formatting field not found in record: %s' % e)
ValueError: Formatting field not found in record: 'asctime'
... repeat many times until RecursionError
File "/usr/local/lib/python3.8/posixpath.py", line 143, in basename
sep = _get_sep(p)
File "/usr/local/lib/python3.8/posixpath.py", line 42, in _get_sep
if isinstance(path, bytes):
RecursionError: maximum recursion depth exceeded while calling a Python
object
[2021-07-30 09:29:04,159: ERROR/ForkPoolWorker-1] Task
airflow.executors.celery_executor.execute_command[505225e3-267c-4661-87af-8618a4f7d202]
raised unexpected: AirflowException('Celery command failed on host:
b0b216e2ccea')
Traceback (most recent call last):
File "/airflow/env/lib/python3.8/site-packages/celery/app/trace.py", line
412, in trace_task
R = retval = fun(*args, **kwargs)
File "/airflow/env/lib/python3.8/site-packages/celery/app/trace.py", line
704, in __protected_call__
return self.run(*args, **kwargs)
File
"/airflow/env/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
line 88, in execute_command
_execute_in_fork(command_to_exec)
File
"/airflow/env/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' +
get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host:
b0b216e2ccea
```
My logging setting are
```
[logging]
remote_logging = True
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s -
%%(message)s
[elasticsearch]
host = http://localhost:9200
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
end_of_log_mark = end_of_log
write_stdout = True
json_format = True
# There is no `asctime` here
json_fields = filename, lineno, levelname, message
host_field = host
offset_field = offset
```
We use custom log_config.py to write all logs to syslog
```python
import copy
import sys
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
# we don't want airflow to write to files
# force stdout or elastic only
LOGGING_CONFIG = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
for handler, config in LOGGING_CONFIG['handlers'].items():
if (handler in ('console', 'processor', 'processor_manager')
or handler == 'task' and config['class'] ==
'airflow.utils.log.file_task_handler.FileTaskHandler'):
LOGGING_CONFIG['handlers'][handler] = {
# default airflow.utils.log.logging_mixin.RedirectStdHandler
breaks DAG processor
'class': 'logging.StreamHandler',
'formatter': 'airflow',
'stream': sys.stdout,
'filters': ['mask_secrets'],
}
```
If I add `asctime` to `json_fields` or remove from `log_format` or disable
remote logging, everything works as expected
**What you expected to happen**:
Celery worker should not fail
**How to reproduce it**:
Config logging and elasticsearch as in the example above. Execute any DAG.
Look at the worker logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]