mik-laj commented on a change in pull request #9092:
URL: https://github.com/apache/airflow/pull/9092#discussion_r433248371



##########
File path: airflow/logging_config.py
##########
@@ -99,3 +103,234 @@ def _get_handler(name):
                 "Configured task_log_reader {!r} was not a handler of the 
'airflow.task' "
                 "logger.".format(task_log_reader)
             )
+
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+LOG_LEVEL: str = conf.get('logging', 'LOGGING_LEVEL').upper()
+
+
+# Flask appbuilder's info level log is very verbose,
+# so it's set to 'WARN' by default.
+FAB_LOG_LEVEL: str = conf.get('logging', 'FAB_LOGGING_LEVEL').upper()
+
+LOG_FORMAT: str = conf.get('logging', 'LOG_FORMAT')
+
+COLORED_LOG_FORMAT: str = conf.get('logging', 'COLORED_LOG_FORMAT')
+
+COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
+
+COLORED_FORMATTER_CLASS: str = conf.get('logging', 'COLORED_FORMATTER_CLASS')
+
+BASE_LOG_FOLDER: str = conf.get('logging', 'BASE_LOG_FOLDER')
+
+PROCESSOR_LOG_FOLDER: str = conf.get('scheduler', 
'CHILD_PROCESS_LOG_DIRECTORY')
+
+DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get('logging', 
'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
+
+FILENAME_TEMPLATE: str = conf.get('logging', 'LOG_FILENAME_TEMPLATE')
+
+PROCESSOR_FILENAME_TEMPLATE: str = conf.get('logging', 
'LOG_PROCESSOR_FILENAME_TEMPLATE')
+
+DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow': {
+            'format': LOG_FORMAT
+        },
+        'airflow_coloured': {
+            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
+            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 
'logging.Formatter'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
+            'formatter': 'airflow_coloured',
+            'stream': 'sys.stdout'
+        },
+        'task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'processor': {
+            'class': 
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+        }
+    },
+    'loggers': {
+        'airflow.processor': {
+            'handlers': ['processor'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task': {
+            'handlers': ['task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'flask_appbuilder': {
+            'handler': ['console'],
+            'level': FAB_LOG_LEVEL,
+            'propagate': True,
+        }
+    },
+    'root': {
+        'handlers': ['console'],
+        'level': LOG_LEVEL,
+    }
+}
+
+DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
+    'handlers': {
+        'processor_manager': {
+            'class': 'logging.handlers.RotatingFileHandler',
+            'formatter': 'airflow',
+            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
+            'mode': 'a',
+            'maxBytes': 104857600,  # 100MB
+            'backupCount': 5
+        }
+    },
+    'loggers': {
+        'airflow.processor_manager': {
+            'handlers': ['processor_manager'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        }
+    }
+}
+
+# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is 
set.
+# This is to avoid exceptions when initializing RotatingFileHandler multiple 
times
+# in multiple processes.
+if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
+    DEFAULT_LOGGING_CONFIG['handlers'] \
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
+    DEFAULT_LOGGING_CONFIG['loggers'] \
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
+
+    # Manually create log directory for processor_manager handler as 
RotatingFileHandler
+    # will only create file but not the directory.
+    processor_manager_handler_config: Dict[str, Any] = \
+        DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']['processor_manager']
+    directory: str = 
os.path.dirname(processor_manager_handler_config['filename'])
+    mkdirs(directory, 0o755)
+
+##################
+# Remote logging #
+##################
+
+REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging')
+
+if REMOTE_LOGGING:
+
+    ELASTICSEARCH_HOST: str = conf.get('elasticsearch', 'HOST')
+
+    # Storage bucket URL for remote logging
+    # S3 buckets should start with "s3://"
+    # Cloudwatch log groups should start with "cloudwatch://"
+    # GCS buckets should start with "gs://"
+    # WASB buckets should start with "wasb"
+    # just to help Airflow select correct handler
+    REMOTE_BASE_LOG_FOLDER: str = conf.get('logging', 'REMOTE_BASE_LOG_FOLDER')
+
+    if REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
+        S3_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                's3_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(S3_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('cloudwatch://'):
+        CLOUDWATCH_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 
'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'log_group_arn': urlparse(REMOTE_BASE_LOG_FOLDER).netloc,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(CLOUDWATCH_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
+        GCS_REMOTE_HANDLERS: Dict[str, Dict[str, str]] = {
+            'task': {
+                'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'filename_template': FILENAME_TEMPLATE,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(GCS_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
+        WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
+            'task': {
+                'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
+                'wasb_container': 'airflow-logs',
+                'filename_template': FILENAME_TEMPLATE,
+                'delete_local_copy': False,
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(WASB_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith('stackdriver://'):
+        gcp_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID', fallback=None)
+        # stackdriver:///airflow-tasks => airflow-tasks
+        log_name = urlparse(REMOTE_BASE_LOG_FOLDER).path[1:]
+        STACKDRIVER_REMOTE_HANDLERS = {
+            'task': {
+                'class': 
'airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler',
+                'formatter': 'airflow',
+                'name': log_name,
+                'gcp_conn_id': gcp_conn_id
+            }
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS)
+    elif ELASTICSEARCH_HOST:
+        ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch', 
'LOG_ID_TEMPLATE')
+        ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch', 
'END_OF_LOG_MARK')
+        ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 
'WRITE_STDOUT')
+        ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 
'JSON_FORMAT')
+        ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 
'JSON_FIELDS')
+
+        ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
+            'task': {
+                'class': 
'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
+                'formatter': 'airflow',
+                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
+                'log_id_template': ELASTICSEARCH_LOG_ID_TEMPLATE,
+                'filename_template': FILENAME_TEMPLATE,
+                'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
+                'host': ELASTICSEARCH_HOST,
+                'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
+                'json_format': ELASTICSEARCH_JSON_FORMAT,
+                'json_fields': ELASTICSEARCH_JSON_FIELDS
+            },
+        }
+
+        DEFAULT_LOGGING_CONFIG['handlers'].update(ELASTIC_REMOTE_HANDLERS)
+    else:
+        raise AirflowException(

Review comment:
       This will block the configuration of other task handlers?   Previously 
you could overwrite the entire file, so it wasn't a big problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to