This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch logging-settings-is-not-a-template
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a7064f295feab056eebd094327bc1573a2b83103
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon Jun 1 13:53:50 2020 +0100

    airflow_local_settings is not a "template" file
    
    All of the other files in the config_templates are that -- templates,
    but airflow_local_settings as it was was the actual file used by running
    installs.
    
    For the sake of clarity let's move it
---
 airflow/config_templates/airflow_local_settings.py | 283 +--------------------
 airflow/configuration.py                           |  12 +-
 airflow/logging_config.py                          | 282 +++++++++++++++++++-
 .../logging-monitoring/logging-tasks.rst           |   2 +-
 newsfragments/9092.significant.rst                 |   8 +
 tests/api_connexion/endpoints/test_log_endpoint.py |   2 +-
 tests/dag_processing/test_manager.py               |   2 +-
 .../task/task_runner/test_standard_task_runner.py  |   2 +-
 tests/utils/log/test_log_reader.py                 |   2 +-
 tests/utils/test_log_handlers.py                   |   2 +-
 .../test_task_handler_with_custom_formatter.py     |   2 +-
 tests/www/views/test_views_log.py                  |   2 +-
 12 files changed, 314 insertions(+), 287 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 6684fd18e5..bcb604d7fd 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -15,283 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Airflow logging settings"""
+"""This module is deprecated. Please use `airflow.logging_config` instead"""
 
-import os
-from pathlib import Path
-from typing import Any, Dict, Optional, Union
-from urllib.parse import urlparse
+import warnings
 
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow.logging_config import *  # noqa
 
-# TODO: Logging format and level should be configured
-# in this file instead of from airflow.cfg. Currently
-# there are other log format and level configurations in
-# settings.py and cli.py. Please see AIRFLOW-1455.
-LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper()
-
-
-# Flask appbuilder's info level log is very verbose,
-# so it's set to 'WARN' by default.
-FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 
'FAB_LOGGING_LEVEL').upper()
-
-LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT')
-
-COLORED_LOG_FORMAT: str = conf.get_mandatory_value('logging', 
'COLORED_LOG_FORMAT')
-
-COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
-
-COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 
'COLORED_FORMATTER_CLASS')
-
-BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER')
-
-PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 
'CHILD_PROCESS_LOG_DIRECTORY')
-
-DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
-    'logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION'
+warnings.warn(
+    "This module is deprecated. Please use `airflow.logging_config`.",
+    DeprecationWarning,
+    stacklevel=2,
 )
-
-FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 
'LOG_FILENAME_TEMPLATE')
-
-PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 
'LOG_PROCESSOR_FILENAME_TEMPLATE')
-
-DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow': {'format': LOG_FORMAT},
-        'airflow_coloured': {
-            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
-            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 
'logging.Formatter',
-        },
-    },
-    'filters': {
-        'mask_secrets': {
-            '()': 'airflow.utils.log.secrets_masker.SecretsMasker',
-        },
-    },
-    'handlers': {
-        'console': {
-            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
-            'formatter': 'airflow_coloured',
-            'stream': 'sys.stdout',
-            'filters': ['mask_secrets'],
-        },
-        'task': {
-            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
-            'formatter': 'airflow',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'filters': ['mask_secrets'],
-        },
-        'processor': {
-            'class': 
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
-            'formatter': 'airflow',
-            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
-            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
-            'filters': ['mask_secrets'],
-        },
-    },
-    'loggers': {
-        'airflow.processor': {
-            'handlers': ['processor'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-        'airflow.task': {
-            'handlers': ['task'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-            'filters': ['mask_secrets'],
-        },
-        'flask_appbuilder': {
-            'handlers': ['console'],
-            'level': FAB_LOG_LEVEL,
-            'propagate': True,
-        },
-    },
-    'root': {
-        'handlers': ['console'],
-        'level': LOG_LEVEL,
-        'filters': ['mask_secrets'],
-    },
-}
-
-EXTRA_LOGGER_NAMES: Optional[str] = conf.get('logging', 'EXTRA_LOGGER_NAMES', 
fallback=None)
-if EXTRA_LOGGER_NAMES:
-    new_loggers = {
-        logger_name.strip(): {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': True,
-        }
-        for logger_name in EXTRA_LOGGER_NAMES.split(",")
-    }
-    DEFAULT_LOGGING_CONFIG['loggers'].update(new_loggers)
-
-DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
-    'handlers': {
-        'processor_manager': {
-            'class': 'logging.handlers.RotatingFileHandler',
-            'formatter': 'airflow',
-            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
-            'mode': 'a',
-            'maxBytes': 104857600,  # 100MB
-            'backupCount': 5,
-        }
-    },
-    'loggers': {
-        'airflow.processor_manager': {
-            'handlers': ['processor_manager'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        }
-    },
-}
-
-# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is 
set.
-# This is to avoid exceptions when initializing RotatingFileHandler multiple 
times
-# in multiple processes.
-if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
-    
DEFAULT_LOGGING_CONFIG['handlers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
-    
DEFAULT_LOGGING_CONFIG['loggers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
-
-    # Manually create log directory for processor_manager handler as 
RotatingFileHandler
-    # will only create file but not the directory.
-    processor_manager_handler_config: Dict[str, Any] = 
DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
-        'processor_manager'
-    ]
-    directory: str = 
os.path.dirname(processor_manager_handler_config['filename'])
-    Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
-
-##################
-# Remote logging #
-##################
-
-REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging')
-
-if REMOTE_LOGGING:
-
-    ELASTICSEARCH_HOST: Optional[str] = conf.get('elasticsearch', 'HOST')
-
-    # Storage bucket URL for remote logging
-    # S3 buckets should start with "s3://"
-    # Cloudwatch log groups should start with "cloudwatch://"
-    # GCS buckets should start with "gs://"
-    # WASB buckets should start with "wasb"
-    # just to help Airflow select correct handler
-    REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 
'REMOTE_BASE_LOG_FOLDER')
-
-    if REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
-        S3_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
-            'task': {
-                'class': 
'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
-                's3_log_folder': REMOTE_BASE_LOG_FOLDER,
-                'filename_template': FILENAME_TEMPLATE,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS)
-    elif REMOTE_BASE_LOG_FOLDER.startswith('cloudwatch://'):
-        url_parts = urlparse(REMOTE_BASE_LOG_FOLDER)
-        CLOUDWATCH_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
-            'task': {
-                'class': 
'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
-                'log_group_arn': url_parts.netloc + url_parts.path,
-                'filename_template': FILENAME_TEMPLATE,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(CLOUDWATCH_REMOTE_HANDLERS)
-    elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
-        key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', 
fallback=None)
-        GCS_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
-            'task': {
-                'class': 
'airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
-                'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
-                'filename_template': FILENAME_TEMPLATE,
-                'gcp_key_path': key_path,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS)
-    elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
-        WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
-            'task': {
-                'class': 
'airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
-                'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
-                'wasb_container': 'airflow-logs',
-                'filename_template': FILENAME_TEMPLATE,
-                'delete_local_copy': False,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS)
-    elif REMOTE_BASE_LOG_FOLDER.startswith('stackdriver://'):
-        key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', 
fallback=None)
-        # stackdriver:///airflow-tasks => airflow-tasks
-        log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
-        STACKDRIVER_REMOTE_HANDLERS = {
-            'task': {
-                'class': 
'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler',
-                'formatter': 'airflow',
-                'name': log_name,
-                'gcp_key_path': key_path,
-            }
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS)
-    elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'):
-        OSS_REMOTE_HANDLERS = {
-            'task': {
-                'class': 
'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-                'oss_log_folder': REMOTE_BASE_LOG_FOLDER,
-                'filename_template': FILENAME_TEMPLATE,
-            },
-        }
-        DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS)
-    elif ELASTICSEARCH_HOST:
-        ELASTICSEARCH_LOG_ID_TEMPLATE: str = 
conf.get_mandatory_value('elasticsearch', 'LOG_ID_TEMPLATE')
-        ELASTICSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value('elasticsearch', 'END_OF_LOG_MARK')
-        ELASTICSEARCH_FRONTEND: str = 
conf.get_mandatory_value('elasticsearch', 'frontend')
-        ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 
'WRITE_STDOUT')
-        ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 
'JSON_FORMAT')
-        ELASTICSEARCH_JSON_FIELDS: str = 
conf.get_mandatory_value('elasticsearch', 'JSON_FIELDS')
-        ELASTICSEARCH_HOST_FIELD: str = 
conf.get_mandatory_value('elasticsearch', 'HOST_FIELD')
-        ELASTICSEARCH_OFFSET_FIELD: str = 
conf.get_mandatory_value('elasticsearch', 'OFFSET_FIELD')
-
-        ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
-            'task': {
-                'class': 
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
-                'formatter': 'airflow',
-                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
-                'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
-                'filename_template': FILENAME_TEMPLATE,
-                'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
-                'host': ELASTICSEARCH_HOST,
-                'frontend': ELASTICSEARCH_FRONTEND,
-                'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
-                'json_format': ELASTICSEARCH_JSON_FORMAT,
-                'json_fields': ELASTICSEARCH_JSON_FIELDS,
-                'host_field': ELASTICSEARCH_HOST_FIELD,
-                'offset_field': ELASTICSEARCH_OFFSET_FIELD,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS)
-    else:
-        raise AirflowException(
-            "Incorrect remote log configuration. Please check the 
configuration of option 'host' in "
-            "section 'elasticsearch' if you are using Elasticsearch. In the 
other case, "
-            "'remote_base_log_folder' option in the 'logging' section."
-        )
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 84df81812c..813f1a39eb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -228,6 +228,11 @@ class AirflowConfigParser(ConfigParser):
     deprecated_values: Dict[str, Dict[str, Tuple[Pattern, str, str]]] = {
         'core': {
             'hostname_callable': (re.compile(r':'), r'.', '2.1'),
+            'logging_config_class': (
+                
re.compile(r'\Aairflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG\Z'),
+                'airflow.logging_config.DEFAULT_LOGGING_CONFIG',
+                '3.0',
+            ),
         },
         'webserver': {
             'navbar_color': (re.compile(r'\A#007A87\Z', re.IGNORECASE), 
'#fff', '2.1'),
@@ -246,6 +251,11 @@ class AirflowConfigParser(ConfigParser):
                 "XX-set-after-default-config-loaded-XX",
                 '3.0',
             ),
+            'logging_config_class': (
+                
re.compile(r'\Aairflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG\Z'),
+                'airflow.logging_config.DEFAULT_LOGGING_CONFIG',
+                '3.0',
+            ),
         },
         'api': {
             'auth_backends': (
@@ -422,7 +432,7 @@ class AirflowConfigParser(ConfigParser):
                 )
 
     def _using_old_value(self, old: Pattern, current_value: str) -> bool:
-        return old.search(current_value) is not None
+        return current_value is not None and old.search(current_value) is not 
None
 
     def _update_env_var(self, section: str, name: str, new_value: Union[str]):
         env_var = self._env_var_name(section, name)
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index 645e53eb3e..0b35eade9b 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -17,8 +17,12 @@
 # under the License.
 #
 import logging
+import os
 import warnings
 from logging.config import dictConfig
+from pathlib import Path
+from typing import Any, Dict, Optional, Union
+from urllib.parse import urlparse
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
@@ -48,8 +52,8 @@ def configure_logging():
             # Import default logging configurations.
             raise ImportError(f'Unable to load custom logging from 
{logging_class_path} due to {err}')
     else:
-        logging_class_path = 
'airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG'
-        logging_config = import_string(logging_class_path)
+        logging_config = DEFAULT_LOGGING_CONFIG
+        logging_class_path = 'airflow.logging_config.DEFAULT_LOGGING_CONFIG'
         log.debug('Unable to load custom logging, using default config 
instead')
 
     try:
@@ -82,6 +86,12 @@ def validate_logging_config(logging_config):
     # Now lets validate the other logging-related settings
     task_log_reader = conf.get('logging', 'task_log_reader')
 
+    if REMOTE_LOGGING and logging_config['handlers']['task'] is 
__stock_task_handler:
+        raise AirflowConfigException(
+            "[logging] remote_logging config was enabled, but no suitable 
backend was detected. Please check"
+            " the 'remote_base_log_folder' setting"
+        )
+
     logger = logging.getLogger('airflow.task')
 
     def _get_handler(name):
@@ -102,3 +112,271 @@ def validate_logging_config(logging_config):
                 f"Configured task_log_reader {task_log_reader!r} was not a 
handler of "
                 f"the 'airflow.task' logger."
             )
+
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper()
+
+
+# Flask appbuilder's info level log is very verbose,
+# so it's set to 'WARN' by default.
+FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 
'FAB_LOGGING_LEVEL').upper()
+
+LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT')
+
+COLORED_LOG_FORMAT: str = conf.get_mandatory_value('logging', 
'COLORED_LOG_FORMAT')
+
+COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
+
+COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 
'COLORED_FORMATTER_CLASS')
+
+BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER')
+
+PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 
'CHILD_PROCESS_LOG_DIRECTORY')
+
+DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
+    'logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION'
+)
+
+FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 
'LOG_FILENAME_TEMPLATE')
+
+PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value('logging', 
'LOG_PROCESSOR_FILENAME_TEMPLATE')
+
+DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow': {'format': LOG_FORMAT},
+        'airflow_coloured': {
+            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
+            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 
'logging.Formatter',
+        },
+    },
+    'filters': {
+        'mask_secrets': {
+            '()': 'airflow.utils.log.secrets_masker.SecretsMasker',
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
+            'formatter': 'airflow_coloured',
+            'stream': 'sys.stdout',
+            'filters': ['mask_secrets'],
+        },
+        'task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filters': ['mask_secrets'],
+        },
+        'processor': {
+            'class': 
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+            'filters': ['mask_secrets'],
+        },
+    },
+    'loggers': {
+        'airflow.processor': {
+            'handlers': ['processor'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task': {
+            'handlers': ['task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+            'filters': ['mask_secrets'],
+        },
+        'flask_appbuilder': {
+            'handlers': ['console'],
+            'level': FAB_LOG_LEVEL,
+            'propagate': True,
+        },
+    },
+    'root': {
+        'handlers': ['console'],
+        'level': LOG_LEVEL,
+        'filters': ['mask_secrets'],
+    },
+}
+
+__stock_task_handler = DEFAULT_LOGGING_CONFIG['handlers']['task']
+
+EXTRA_LOGGER_NAMES: Optional[str] = conf.get('logging', 'EXTRA_LOGGER_NAMES', 
fallback=None)
+if EXTRA_LOGGER_NAMES:
+    new_loggers = {
+        logger_name.strip(): {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        }
+        for logger_name in EXTRA_LOGGER_NAMES.split(",")
+    }
+    DEFAULT_LOGGING_CONFIG['loggers'].update(new_loggers)
+
+DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
+    'handlers': {
+        'processor_manager': {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'formatter': 'airflow',
+            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
+            'mode': 'a',
+            'maxBytes': 104857600,  # 100MB
+            'backupCount': 5,
+        }
+    },
+    'loggers': {
+        'airflow.processor_manager': {
+            'handlers': ['processor_manager'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        }
+    },
+}
+
+# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is 
set.
+# This is to avoid exceptions when initializing RotatingFileHandler multiple 
times
+# in multiple processes.
+if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
+    
DEFAULT_LOGGING_CONFIG['handlers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
+    
DEFAULT_LOGGING_CONFIG['loggers'].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
+
+    # Manually create log directory for processor_manager handler as 
RotatingFileHandler
+    # will only create file but not the directory.
+    processor_manager_handler_config: Dict[str, Any] = 
DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
+        'processor_manager'
+    ]
+    directory: str = 
os.path.dirname(processor_manager_handler_config['filename'])
+    Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
+
+##################
+# Remote logging #
+##################
+
+REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging')
+
+if REMOTE_LOGGING:
+
+    ELASTICSEARCH_HOST: Optional[str] = conf.get('elasticsearch', 'HOST')
+
+    # Storage bucket URL for remote logging
+    # S3 buckets should start with "s3://"
+    # Cloudwatch log groups should start with "cloudwatch://"
+    # GCS buckets should start with "gs://"
+    # WASB buckets should start with "wasb"
+    # just to help Airflow select correct handler
+    REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 
'REMOTE_BASE_LOG_FOLDER')
+
+    if REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
+        S3_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 
'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                's3_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('cloudwatch://'):
+        url_parts = urlparse(REMOTE_BASE_LOG_FOLDER)
+        CLOUDWATCH_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 
'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'log_group_arn': url_parts.netloc + url_parts.path,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(CLOUDWATCH_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
+        key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', 
fallback=None)
+        GCS_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 
'airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'filename_template': FILENAME_TEMPLATE,
+                'gcp_key_path': key_path,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
+        WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
+            'task': {
+                'class': 
'airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'wasb_container': 'airflow-logs',
+                'filename_template': FILENAME_TEMPLATE,
+                'delete_local_copy': False,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('stackdriver://'):
+        key_path = conf.get_mandatory_value('logging', 'GOOGLE_KEY_PATH', 
fallback=None)
+        # stackdriver:///airflow-tasks => airflow-tasks
+        log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
+        STACKDRIVER_REMOTE_HANDLERS = {
+            'task': {
+                'class': 
'airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler',
+                'formatter': 'airflow',
+                'name': log_name,
+                'gcp_key_path': key_path,
+            }
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'):
+        OSS_REMOTE_HANDLERS = {
+            'task': {
+                'class': 
'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+                'oss_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+        DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS)
+    elif ELASTICSEARCH_HOST:
+        ELASTICSEARCH_LOG_ID_TEMPLATE: str = 
conf.get_mandatory_value('elasticsearch', 'LOG_ID_TEMPLATE')
+        ELASTICSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value('elasticsearch', 'END_OF_LOG_MARK')
+        ELASTICSEARCH_FRONTEND: str = 
conf.get_mandatory_value('elasticsearch', 'frontend')
+        ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 
'WRITE_STDOUT')
+        ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 
'JSON_FORMAT')
+        ELASTICSEARCH_JSON_FIELDS: str = 
conf.get_mandatory_value('elasticsearch', 'JSON_FIELDS')
+        ELASTICSEARCH_HOST_FIELD: str = 
conf.get_mandatory_value('elasticsearch', 'HOST_FIELD')
+        ELASTICSEARCH_OFFSET_FIELD: str = 
conf.get_mandatory_value('elasticsearch', 'OFFSET_FIELD')
+
+        ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
+            'task': {
+                'class': 
'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
+                'filename_template': FILENAME_TEMPLATE,
+                'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
+                'host': ELASTICSEARCH_HOST,
+                'frontend': ELASTICSEARCH_FRONTEND,
+                'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
+                'json_format': ELASTICSEARCH_JSON_FORMAT,
+                'json_fields': ELASTICSEARCH_JSON_FIELDS,
+                'host_field': ELASTICSEARCH_HOST_FIELD,
+                'offset_field': ELASTICSEARCH_OFFSET_FIELD,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS)
diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst 
b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
index e5b0335651..6078b06179 100644
--- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst
+++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst
@@ -98,7 +98,7 @@ Follow the steps below to enable custom logging config class:
     .. code-block:: python
 
       from copy import deepcopy
-      from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+      from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 
       LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
 
diff --git a/newsfragments/9092.significant.rst 
b/newsfragments/9092.significant.rst
new file mode 100644
index 0000000000..47ffc753aa
--- /dev/null
+++ b/newsfragments/9092.significant.rst
@@ -0,0 +1,8 @@
+Default logging config has moved to a new module
+
+The default logging config dict has moved from
+``airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG`` to
+``airflow.logging_config.DEFAULT_LOGGING_CONFIG``.
+
+Most users will not need to take any action as a result of this change. (If 
you have a custom logging config
+which imports the base one you will receive a deprecation warning.)
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py 
b/tests/api_connexion/endpoints/test_log_endpoint.py
index 1b226be96f..d432540662 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -25,7 +25,7 @@ from itsdangerous.url_safe import URLSafeSerializer
 
 from airflow import DAG
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.operators.empty import EmptyOperator
 from airflow.security import permissions
 from airflow.utils import timezone
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 891df4ec64..bb60999deb 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -37,7 +37,6 @@ from freezegun import freeze_time
 from sqlalchemy import func
 
 from airflow.callbacks.callback_requests import CallbackRequest, 
DagCallbackRequest, SlaCallbackRequest
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.configuration import conf
 from airflow.dag_processing.manager import (
     DagFileProcessorAgent,
@@ -47,6 +46,7 @@ from airflow.dag_processing.manager import (
     DagParsingStat,
 )
 from airflow.dag_processing.processor import DagFileProcessorProcess
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models import DagBag, DagModel, DbCallbackRequest, errors
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel
diff --git a/tests/task/task_runner/test_standard_task_runner.py 
b/tests/task/task_runner/test_standard_task_runner.py
index 2ed266a543..f561f3dc33 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -24,8 +24,8 @@ from unittest import mock
 import psutil
 import pytest
 
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models.dagbag import DagBag
 from airflow.models.taskinstance import TaskInstance
 from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
diff --git a/tests/utils/log/test_log_reader.py 
b/tests/utils/log/test_log_reader.py
index 9a76ada725..1e19607e5b 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -27,7 +27,7 @@ import pendulum
 import pytest
 
 from airflow import settings
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models import DagRun
 from airflow.models.tasklog import LogTemplate
 from airflow.operators.python import PythonOperator
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 28b9c7cf1a..979c66b09a 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,7 +21,7 @@ import logging.config
 import os
 import re
 
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
 from airflow.utils.log.file_task_handler import FileTaskHandler
diff --git a/tests/utils/test_task_handler_with_custom_formatter.py 
b/tests/utils/test_task_handler_with_custom_formatter.py
index df7e174138..d4b8c626eb 100644
--- a/tests/utils/test_task_handler_with_custom_formatter.py
+++ b/tests/utils/test_task_handler_with_custom_formatter.py
@@ -19,7 +19,7 @@ import logging
 
 import pytest
 
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.log.logging_mixin import set_context
diff --git a/tests/www/views/test_views_log.py 
b/tests/www/views/test_views_log.py
index 82b30f9d21..57c1b55be2 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -26,7 +26,7 @@ import urllib.parse
 import pytest
 
 from airflow import settings
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.logging_config import DEFAULT_LOGGING_CONFIG
 from airflow.models import DagBag, DagRun
 from airflow.models.tasklog import LogTemplate
 from airflow.utils import timezone

Reply via email to