This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch logging-settings-is-not-a-template in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a7064f295feab056eebd094327bc1573a2b83103 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Jun 1 13:53:50 2020 +0100 airflow_local_settings is not a "template" file All of the other files in the config_templates are that -- templates, but airflow_local_settings as it was was the actual file used by running installs. For the sake of clarity let's move it --- airflow/config_templates/airflow_local_settings.py | 283 +-------------------- airflow/configuration.py | 12 +- airflow/logging_config.py | 282 +++++++++++++++++++- .../logging-monitoring/logging-tasks.rst | 2 +- newsfragments/9092.significant.rst | 8 + tests/api_connexion/endpoints/test_log_endpoint.py | 2 +- tests/dag_processing/test_manager.py | 2 +- .../task/task_runner/test_standard_task_runner.py | 2 +- tests/utils/log/test_log_reader.py | 2 +- tests/utils/test_log_handlers.py | 2 +- .../test_task_handler_with_custom_formatter.py | 2 +- tests/www/views/test_views_log.py | 2 +- 12 files changed, 314 insertions(+), 287 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 6684fd18e5..bcb604d7fd 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -15,283 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Airflow logging settings""" +"""This module is deprecated. Please use `airflow.logging_config` instead""" -import os -from pathlib import Path -from typing import Any, Dict, Optional, Union -from urllib.parse import urlparse +import warnings -from airflow.configuration import conf -from airflow.exceptions import AirflowException +from airflow.logging_config import * # noqa -# TODO: Logging format and level should be configured -# in this file instead of from airflow.cfg. Currently -# there are other log format and level configurations in -# settings.py and cli.py. Please see AIRFLOW-1455. -LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper() - - -# Flask appbuilder's info level log is very verbose, -# so it's set to 'WARN' by default. -FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').upper() - -LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT') - -COLORED_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'COLORED_LOG_FORMAT') - -COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG') - -COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 'COLORED_FORMATTER_CLASS') - -BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER') - -PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') - -DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( - 'logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION' +warnings.warn( + "This module is deprecated. Please use `airflow.logging_config`.", + DeprecationWarning, + stacklevel=2, ) - -FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 'LOG_FILENAME_TEMPLATE') - -PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 'LOG_PROCESSOR_FILENAME_TEMPLATE') - -DEFAULT_LOGGING_CONFIG: Dict[str, Any] = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow': {'format': LOG_FORMAT}, - 'airflow_coloured': { - 'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, - 'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter', - }, - }, - 'filters': { - 'mask_secrets': { - '()': 'airflow.utils.log.secrets_masker.SecretsMasker', - }, - }, - 'handlers': { - 'console': { - 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', - 'formatter': 'airflow_coloured', - 'stream': 'sys.stdout', - 'filters': ['mask_secrets'], - }, - 'task': { - 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'filters': ['mask_secrets'], - }, - 'processor': { - 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - 'filters': ['mask_secrets'], - }, - }, - 'loggers': { - 'airflow.processor': { - 'handlers': ['processor'], - 'level': LOG_LEVEL, - 'propagate': False, - }, - 'airflow.task': { - 'handlers': ['task'], - 'level': LOG_LEVEL, - 'propagate': False, - 'filters': ['mask_secrets'], - }, - 'flask_appbuilder': { - 'handlers': ['console'], - 'level': FAB_LOG_LEVEL, - 'propagate': True, - }, - }, - 'root': { - 'handlers': ['console'], - 'level': LOG_LEVEL, - 'filters': ['mask_secrets'], - }, -} - -EXTRA_LOGGER_NAMES: Optional[str] = conf.get('logging', 'EXTRA_LOGGER_NAMES', fallback=None) -if EXTRA_LOGGER_NAMES: - new_loggers = { - logger_name.strip(): { - 'handlers': ['console'], - 'level': LOG_LEVEL, - 'propagate': True, - } - for logger_name in EXTRA_LOGGER_NAMES.split(",") - } - DEFAULT_LOGGING_CONFIG['loggers'].update(new_loggers) - -DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = { - 'handlers': { - 'processor_manager': { - 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'airflow', - 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, - 'mode': 'a', - 'maxBytes': 104857600, # 100MB - 'backupCount': 5, - } - }, - 'loggers': { - 'airflow.processor_manager': { - 'handlers': ['processor_manager'], - 'level': LOG_LEVEL, - 'propagate': False, - } - }, -} - -# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. -# This is to avoid exceptions when initializing RotatingFileHandler multiple times -# in multiple processes. -if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': - DEFAULT_LOGGING_CONFIG['handlers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) - DEFAULT_LOGGING_CONFIG['loggers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) - - # Manually create log directory for processor_manager handler as RotatingFileHandler - # will only create file but not the directory. - processor_manager_handler_config: Dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][ - 'processor_manager' - ] - directory: str = os.path.dirname(processor_manager_handler_config['filename']) - Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) - -################## -# Remote logging # -################## - -REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging') - -if REMOTE_LOGGING: - - ELASTICSEARCH_HOST: Optional[str] = conf.get('elasticsearch', 'HOST') - - # Storage bucket URL for remote logging - # S3 buckets should start with "s3://" - # Cloudwatch log groups should start with "cloudwatch://" - # GCS buckets should start with "gs://" - # WASB buckets should start with "wasb" - # just to help Airflow select correct handler - REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'REMOTE_BASE_LOG_FOLDER') - - if REMOTE_BASE_LOG_FOLDER.startswith('s3://'): - S3_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { - 'task': { - 'class': 'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler', - 'formatter': 'airflow', - 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), - 's3_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS) - elif REMOTE_BASE_LOG_FOLDER.startswith('cloudwatch://'): - url_parts = urlparse(REMOTE_BASE_LOG_FOLDER) - CLOUDWATCH_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { - 'task': { - 'class': 'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), - 'log_group_arn': url_parts.netloc + url_parts.path, - 'filename_template': FILENAME_TEMPLATE, - }, - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(CLOUDWATCH_REMOTE_HANDLERS) - elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'): - key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', fallback=None) - GCS_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { - 'task': { - 'class': 'airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), - 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - 'gcp_key_path': key_path, - }, - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS) - elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'): - WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { - 'task': { - 'class': 'airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), - 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, - 'wasb_container': 'airflow-logs', - 'filename_template': FILENAME_TEMPLATE, - 'delete_local_copy': False, - }, - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS) - elif REMOTE_BASE_LOG_FOLDER.startswith('stackdriver://'): - key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', fallback=None) - # stackdriver:///airflow-tasks => airflow-tasks - log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:] - STACKDRIVER_REMOTE_HANDLERS = { - 'task': { - 'class': 'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler', - 'formatter': 'airflow', - 'name': log_name, - 'gcp_key_path': key_path, - } - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS) - elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'): - OSS_REMOTE_HANDLERS = { - 'task': { - 'class': 'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'oss_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - } - DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS) - elif ELASTICSEARCH_HOST: - ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value('elasticsearch', 'LOG_ID_TEMPLATE') - ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value('elasticsearch', 'END_OF_LOG_MARK') - ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value('elasticsearch', 'frontend') - ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT') - ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT') - ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value('elasticsearch', 'JSON_FIELDS') - ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value('elasticsearch', 'HOST_FIELD') - ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value('elasticsearch', 'OFFSET_FIELD') - - ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { - 'task': { - 'class': 'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), - 'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE, - 'filename_template': FILENAME_TEMPLATE, - 'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK, - 'host': ELASTICSEARCH_HOST, - 'frontend': ELASTICSEARCH_FRONTEND, - 'write_stdout': ELASTICSEARCH_WRITE_STDOUT, - 'json_format': ELASTICSEARCH_JSON_FORMAT, - 'json_fields': ELASTICSEARCH_JSON_FIELDS, - 'host_field': ELASTICSEARCH_HOST_FIELD, - 'offset_field': ELASTICSEARCH_OFFSET_FIELD, - }, - } - - DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS) - else: - raise AirflowException( - "Incorrect remote log configuration. Please check the configuration of option 'host' in " - "section 'elasticsearch' if you are using Elasticsearch. In the other case, " - "'remote_base_log_folder' option in the 'logging' section." - ) diff --git a/airflow/configuration.py b/airflow/configuration.py index 84df81812c..813f1a39eb 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -228,6 +228,11 @@ class AirflowConfigParser(ConfigParser): deprecated_values: Dict[str, Dict[str, Tuple[Pattern, str, str]]] = { 'core': { 'hostname_callable': (re.compile(r':'), r'.', '2.1'), + 'logging_config_class': ( + re.compile(r'\Aairflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG\Z'), + 'airflow.logging_config.DEFAULT_LOGGING_CONFIG', + '3.0', + ), }, 'webserver': { 'navbar_color': (re.compile(r'\A#007A87\Z', re.IGNORECASE), '#fff', '2.1'), @@ -246,6 +251,11 @@ class AirflowConfigParser(ConfigParser): "XX-set-after-default-config-loaded-XX", '3.0', ), + 'logging_config_class': ( + re.compile(r'\Aairflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG\Z'), + 'airflow.logging_config.DEFAULT_LOGGING_CONFIG', + '3.0', + ), }, 'api': { 'auth_backends': ( @@ -422,7 +432,7 @@ class AirflowConfigParser(ConfigParser): ) def _using_old_value(self, old: Pattern, current_value: str) -> bool: - return old.search(current_value) is not None + return current_value is not None and old.search(current_value) is not None def _update_env_var(self, section: str, name: str, new_value: Union[str]): env_var = self._env_var_name(section, name) diff --git a/airflow/logging_config.py b/airflow/logging_config.py index 645e53eb3e..0b35eade9b 100644 --- a/airflow/logging_config.py +++ b/airflow/logging_config.py @@ -17,8 +17,12 @@ # under the License. # import logging +import os import warnings from logging.config import dictConfig +from pathlib import Path +from typing import Any, Dict, Optional, Union +from urllib.parse import urlparse from airflow.configuration import conf from airflow.exceptions import AirflowConfigException @@ -48,8 +52,8 @@ def configure_logging(): # Import default logging configurations. raise ImportError(f'Unable to load custom logging from {logging_class_path} due to {err}') else: - logging_class_path = 'airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG' - logging_config = import_string(logging_class_path) + logging_config = DEFAULT_LOGGING_CONFIG + logging_class_path = 'airflow.logging_config.DEFAULT_LOGGING_CONFIG' log.debug('Unable to load custom logging, using default config instead') try: @@ -82,6 +86,12 @@ def validate_logging_config(logging_config): # Now lets validate the other logging-related settings task_log_reader = conf.get('logging', 'task_log_reader') + if REMOTE_LOGGING and logging_config['handlers']['task'] is __stock_task_handler: + raise AirflowConfigException( + "[logging] remote_logging config was enabled, but no suitable backend was detected. Please check" + " the 'remote_base_log_folder' setting" + ) + logger = logging.getLogger('airflow.task') def _get_handler(name): @@ -102,3 +112,271 @@ def validate_logging_config(logging_config): f"Configured task_log_reader {task_log_reader!r} was not a handler of " f"the 'airflow.task' logger." ) + + +# TODO: Logging format and level should be configured +# in this file instead of from airflow.cfg. Currently +# there are other log format and level configurations in +# settings.py and cli.py. Please see AIRFLOW-1455. +LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper() + + +# Flask appbuilder's info level log is very verbose, +# so it's set to 'WARN' by default. +FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').upper() + +LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT') + +COLORED_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'COLORED_LOG_FORMAT') + +COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG') + +COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 'COLORED_FORMATTER_CLASS') + +BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER') + +PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') + +DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( + 'logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION' +) + +FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 'LOG_FILENAME_TEMPLATE') + +PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 'LOG_PROCESSOR_FILENAME_TEMPLATE') + +DEFAULT_LOGGING_CONFIG: Dict[str, Any] = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'airflow': {'format': LOG_FORMAT}, + 'airflow_coloured': { + 'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, + 'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter', + }, + }, + 'filters': { + 'mask_secrets': { + '()': 'airflow.utils.log.secrets_masker.SecretsMasker', + }, + }, + 'handlers': { + 'console': { + 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', + 'formatter': 'airflow_coloured', + 'stream': 'sys.stdout', + 'filters': ['mask_secrets'], + }, + 'task': { + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'filters': ['mask_secrets'], + }, + 'processor': { + 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + 'filters': ['mask_secrets'], + }, + }, + 'loggers': { + 'airflow.processor': { + 'handlers': ['processor'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + 'airflow.task': { + 'handlers': ['task'], + 'level': LOG_LEVEL, + 'propagate': False, + 'filters': ['mask_secrets'], + }, + 'flask_appbuilder': { + 'handlers': ['console'], + 'level': FAB_LOG_LEVEL, + 'propagate': True, + }, + }, + 'root': { + 'handlers': ['console'], + 'level': LOG_LEVEL, + 'filters': ['mask_secrets'], + }, +} + +__stock_task_handler = DEFAULT_LOGGING_CONFIG['handlers']['task'] + +EXTRA_LOGGER_NAMES: Optional[str] = conf.get('logging', 'EXTRA_LOGGER_NAMES', fallback=None) +if EXTRA_LOGGER_NAMES: + new_loggers = { + logger_name.strip(): { + 'handlers': ['console'], + 'level': LOG_LEVEL, + 'propagate': True, + } + for logger_name in EXTRA_LOGGER_NAMES.split(",") + } + DEFAULT_LOGGING_CONFIG['loggers'].update(new_loggers) + +DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = { + 'handlers': { + 'processor_manager': { + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'airflow', + 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, + 'mode': 'a', + 'maxBytes': 104857600, # 100MB + 'backupCount': 5, + } + }, + 'loggers': { + 'airflow.processor_manager': { + 'handlers': ['processor_manager'], + 'level': LOG_LEVEL, + 'propagate': False, + } + }, +} + +# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. +# This is to avoid exceptions when initializing RotatingFileHandler multiple times +# in multiple processes. +if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': + DEFAULT_LOGGING_CONFIG['handlers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) + DEFAULT_LOGGING_CONFIG['loggers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + + # Manually create log directory for processor_manager handler as RotatingFileHandler + # will only create file but not the directory. + processor_manager_handler_config: Dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][ + 'processor_manager' + ] + directory: str = os.path.dirname(processor_manager_handler_config['filename']) + Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) + +################## +# Remote logging # +################## + +REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging') + +if REMOTE_LOGGING: + + ELASTICSEARCH_HOST: Optional[str] = conf.get('elasticsearch', 'HOST') + + # Storage bucket URL for remote logging + # S3 buckets should start with "s3://" + # Cloudwatch log groups should start with "cloudwatch://" + # GCS buckets should start with "gs://" + # WASB buckets should start with "wasb" + # just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'REMOTE_BASE_LOG_FOLDER') + + if REMOTE_BASE_LOG_FOLDER.startswith('s3://'): + S3_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { + 'task': { + 'class': 'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler', + 'formatter': 'airflow', + 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), + 's3_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith('cloudwatch://'): + url_parts = urlparse(REMOTE_BASE_LOG_FOLDER) + CLOUDWATCH_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { + 'task': { + 'class': 'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), + 'log_group_arn': url_parts.netloc + url_parts.path, + 'filename_template': FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(CLOUDWATCH_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'): + key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', fallback=None) + GCS_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = { + 'task': { + 'class': 'airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), + 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + 'gcp_key_path': key_path, + }, + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'): + WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { + 'task': { + 'class': 'airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), + 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, + 'wasb_container': 'airflow-logs', + 'filename_template': FILENAME_TEMPLATE, + 'delete_local_copy': False, + }, + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith('stackdriver://'): + key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', fallback=None) + # stackdriver:///airflow-tasks => airflow-tasks + log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:] + STACKDRIVER_REMOTE_HANDLERS = { + 'task': { + 'class': 'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler', + 'formatter': 'airflow', + 'name': log_name, + 'gcp_key_path': key_path, + } + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'): + OSS_REMOTE_HANDLERS = { + 'task': { + 'class': 'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'oss_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS) + elif ELASTICSEARCH_HOST: + ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value('elasticsearch', 'LOG_ID_TEMPLATE') + ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value('elasticsearch', 'END_OF_LOG_MARK') + ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value('elasticsearch', 'frontend') + ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT') + ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT') + ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value('elasticsearch', 'JSON_FIELDS') + ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value('elasticsearch', 'HOST_FIELD') + ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value('elasticsearch', 'OFFSET_FIELD') + + ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { + 'task': { + 'class': 'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)), + 'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE, + 'filename_template': FILENAME_TEMPLATE, + 'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK, + 'host': ELASTICSEARCH_HOST, + 'frontend': ELASTICSEARCH_FRONTEND, + 'write_stdout': ELASTICSEARCH_WRITE_STDOUT, + 'json_format': ELASTICSEARCH_JSON_FORMAT, + 'json_fields': ELASTICSEARCH_JSON_FIELDS, + 'host_field': ELASTICSEARCH_HOST_FIELD, + 'offset_field': ELASTICSEARCH_OFFSET_FIELD, + }, + } + + DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS) diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst index e5b0335651..6078b06179 100644 --- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst @@ -98,7 +98,7 @@ Follow the steps below to enable custom logging config class: .. code-block:: python from copy import deepcopy - from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG + from airflow.logging_config import DEFAULT_LOGGING_CONFIG LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) diff --git a/newsfragments/9092.significant.rst b/newsfragments/9092.significant.rst new file mode 100644 index 0000000000..47ffc753aa --- /dev/null +++ b/newsfragments/9092.significant.rst @@ -0,0 +1,8 @@ +Default logging config has moved to a new module + +The default logging config dict has moved from +``airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG`` to +``airflow.logging_config.DEFAULT_LOGGING_CONFIG``. + +Most users will not need to take any action as a result of this change. (If you have a custom logging config +which imports the base one you will receive a deprecation warning.) diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py index 1b226be96f..d432540662 100644 --- a/tests/api_connexion/endpoints/test_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_log_endpoint.py @@ -25,7 +25,7 @@ from itsdangerous.url_safe import URLSafeSerializer from airflow import DAG from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.operators.empty import EmptyOperator from airflow.security import permissions from airflow.utils import timezone diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 891df4ec64..bb60999deb 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -37,7 +37,6 @@ from freezegun import freeze_time from sqlalchemy import func from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.configuration import conf from airflow.dag_processing.manager import ( DagFileProcessorAgent, @@ -47,6 +46,7 @@ from airflow.dag_processing.manager import ( DagParsingStat, ) from airflow.dag_processing.processor import DagFileProcessorProcess +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models import DagBag, DagModel, DbCallbackRequest, errors from airflow.models.dagcode import DagCode from airflow.models.serialized_dag import SerializedDagModel diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index 2ed266a543..f561f3dc33 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -24,8 +24,8 @@ from unittest import mock import psutil import pytest -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.jobs.local_task_job import LocalTaskJob +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models.dagbag import DagBag from airflow.models.taskinstance import TaskInstance from airflow.task.task_runner.standard_task_runner import StandardTaskRunner diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 9a76ada725..1e19607e5b 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -27,7 +27,7 @@ import pendulum import pytest from airflow import settings -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models import DagRun from airflow.models.tasklog import LogTemplate from airflow.operators.python import PythonOperator diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 28b9c7cf1a..979c66b09a 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,7 +21,7 @@ import logging.config import os import re -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import FileTaskHandler diff --git a/tests/utils/test_task_handler_with_custom_formatter.py b/tests/utils/test_task_handler_with_custom_formatter.py index df7e174138..d4b8c626eb 100644 --- a/tests/utils/test_task_handler_with_custom_formatter.py +++ b/tests/utils/test_task_handler_with_custom_formatter.py @@ -19,7 +19,7 @@ import logging import pytest -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, TaskInstance from airflow.operators.empty import EmptyOperator from airflow.utils.log.logging_mixin import set_context diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 82b30f9d21..57c1b55be2 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -26,7 +26,7 @@ import urllib.parse import pytest from airflow import settings -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.logging_config import DEFAULT_LOGGING_CONFIG from airflow.models import DagBag, DagRun from airflow.models.tasklog import LogTemplate from airflow.utils import timezone
