EtsuNDmA opened a new issue #17336:
URL: https://github.com/apache/airflow/issues/17336


   - **Apache Airflow version**: 2.1.2
   - **Python**: Python 3.8.5
   - **CeleryExecutor**
   
   **Environment**:
   
   - **OS** (e.g. from /etc/os-release):  Debian GNU/Linux 10 (buster)
   - **Kernel** (e.g. `uname -a`): Linux 074f2dc34905 5.10.25-linuxkit
   
   **What happened**:
   We use remote logging to elasticsearch. After upgrade from 1.10.15 to 2.1.2 
celery worker fails on each task when the `logging.log_format` contains 
`asctime` but the `elasticsearch.json_fields` don't
   
   Worker logs:
   ```
    [2021-07-30 09:29:04,120: ERROR/ForkPoolWorker-1] Failed to execute task 
maximum recursion depth exceeded while calling a Python object.
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/logging/__init__.py", line 436, in format
        return self._format(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 432, in _format
        return self._fmt % record.__dict__
    KeyError: 'asctime'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/logging/__init__.py", line 1081, in emit
        msg = self.format(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 925, in format
        return fmt.format(record)
      File 
"/airflow/env/lib/python3.8/site-packages/airflow/utils/log/json_formatter.py", 
line 43, in format
        super().format(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 667, in format
        s = self.formatMessage(record)
     File "/usr/local/lib/python3.8/logging/__init__.py", line 636, in 
formatMessage
       return self._style.format(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 438, in format
        raise ValueError('Formatting field not found in record: %s' % e)
   ValueError: Formatting field not found in record: 'asctime'
   
   ... repeat many times until RecursionError
   
      File "/usr/local/lib/python3.8/posixpath.py", line 143, in basename
        sep = _get_sep(p)
      File "/usr/local/lib/python3.8/posixpath.py", line 42, in _get_sep
        if isinstance(path, bytes):
    RecursionError: maximum recursion depth exceeded while calling a Python 
object
   
    [2021-07-30 09:29:04,159: ERROR/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[505225e3-267c-4661-87af-8618a4f7d202]
 raised unexpected: AirflowException('Celery command failed on host: 
b0b216e2ccea')
    Traceback (most recent call last):
      File "/airflow/env/lib/python3.8/site-packages/celery/app/trace.py", line 
412, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/airflow/env/lib/python3.8/site-packages/celery/app/trace.py", line 
704, in __protected_call__
        return self.run(*args, **kwargs)
      File 
"/airflow/env/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 88, in execute_command
        _execute_in_fork(command_to_exec)
      File 
"/airflow/env/lib/python3.8/site-packages/airflow/executors/celery_executor.py",
 line 99, in _execute_in_fork
        raise AirflowException('Celery command failed on host: ' + 
get_hostname())
    airflow.exceptions.AirflowException: Celery command failed on host: 
b0b216e2ccea
   ```
   
   My logging setting are
   ```
   [logging]
   remote_logging = True
   log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - 
%%(message)s
   
   [elasticsearch]
   host = http://localhost:9200
   log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
   end_of_log_mark = end_of_log
   write_stdout = True
   json_format = True
   
   # There is no `asctime` here
   json_fields = filename, lineno, levelname, message 
   
   host_field = host
   offset_field = offset
   ```
   
   We use custom log_config.py to write all logs to syslog
   
   ```python
   import copy
   import sys
   
   from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
   
   
   # we don't want airflow to write to files
   # force stdout or elastic only
   LOGGING_CONFIG = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
   for handler, config in LOGGING_CONFIG['handlers'].items():
       if (handler in ('console', 'processor', 'processor_manager')
               or handler == 'task' and config['class'] == 
'airflow.utils.log.file_task_handler.FileTaskHandler'):
           LOGGING_CONFIG['handlers'][handler] = {
               # default airflow.utils.log.logging_mixin.RedirectStdHandler 
breaks DAG processor
               'class': 'logging.StreamHandler',
               'formatter': 'airflow',
               'stream': sys.stdout,
               'filters': ['mask_secrets'],
           }
   
   ```
   
   
   If I add `asctime` to `json_fields` or remove from `log_format` or disable 
remote logging, everything works as expected
   
   **What you expected to happen**:
   Celery worker should not fail
   
   **How to reproduce it**:
   Config logging and elasticsearch as in the example above. Execute any DAG. 
Look at the worker logs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to