vikramarsid opened a new issue #18540:
URL: https://github.com/apache/airflow/issues/18540


   ### Apache Airflow version
   
   2.2.0b2 (beta snapshot)
   
   ### Operating System
   
   linux/amd64
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Dockerfile based deployments with DAGs copied into the container. Also, to 
use a different JSON library instead of the standard json library like ujson, 
local Airflow settings (airflow_local_settings.py) is added to the container. 
   
   ### What happened
   
   Error occured while initilizing using airflow-init(docker-commpose)
   
   ```
   Failed to import airflow_local_settings.
   2021-09-26T19:39:29.007516600Z Traceback (most recent call last):
   2021-09-26T19:39:29.007945200Z   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/settings.py", line 
441, in import_local_settings
   2021-09-26T19:39:29.008005300Z     import airflow_local_settings
   2021-09-26T19:39:29.008025000Z   File 
"/opt/airflow/airflow_local_settings.py", line 26, in <module>
   2021-09-26T19:39:29.008038200Z     from airflow import AirflowException
   2021-09-26T19:39:29.008057800Z ImportError: cannot import name 
'AirflowException' from partially initialized module 'airflow' (most likely due 
to a circular import) 
(/home/airflow/.local/lib/python3.9/site-packages/airflow/__init__.py)
   2021-09-26T19:39:29.008076900Z Traceback (most recent call last):
   2021-09-26T19:39:29.008200800Z   File "/home/airflow/.local/bin/airflow", 
line 5, in <module>
   2021-09-26T19:39:29.008222900Z     from airflow.__main__ import main
   2021-09-26T19:39:29.008242500Z   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/__init__.py", line 
46, in <module>
   2021-09-26T19:39:29.008262200Z     settings.initialize()
   2021-09-26T19:39:29.008284000Z   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/settings.py", line 
481, in initialize
   2021-09-26T19:39:29.008311700Z     import_local_settings()
   2021-09-26T19:39:29.008330800Z   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/settings.py", line 
441, in import_local_settings
   2021-09-26T19:39:29.008348300Z     import airflow_local_settings
   2021-09-26T19:39:29.008363400Z   File 
"/opt/airflow/airflow_local_settings.py", line 26, in <module>
   2021-09-26T19:39:29.008383200Z     from airflow import AirflowException
   2021-09-26T19:39:29.008402100Z ImportError: cannot import name 
'AirflowException' from partially initialized module 'airflow' (most likely due 
to a circular import) 
(/home/airflow/.local/lib/python3.9/site-packages/airflow/__init__.py)
   2021-09-26T19:39:29.072781700Z 
   ```
   
   ### What you expected to happen
   
   I would expect Airflow to initialize without any errors.
   
   ### How to reproduce
   
   Add airflow_local_settings.py like this
   
   ```
   # -*- coding: utf-8 -*-
   #
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   
   import os
   from typing import Any
   from typing import Dict
   
   import six
   import ujson
   from airflow import AirflowException
   from airflow.configuration import conf
   from airflow.utils.file import mkdirs
   
   # Using ujson json library for performance
   # 
https://airflow.apache.org/docs/stable/dag-serialization.html#using-a-different-json-library
   json = ujson
   
   # TODO: Logging format and level should be configured
   # in this file instead of from airflow.cfg. Currently
   # there are other log format and level configurations in
   # settings.py and cli.py. Please see AIRFLOW-1455.
   LOG_LEVEL = conf.get("logging", "LOGGING_LEVEL").upper()
   
   # Flask appbuilder's info level log is very verbose,
   # so it's set to 'WARN' by default.
   FAB_LOG_LEVEL = conf.get("logging", "FAB_LOGGING_LEVEL").upper()
   
   LOG_FORMAT = conf.get("logging", "LOG_FORMAT")
   
   COLORED_LOG_FORMAT = conf.get("logging", "COLORED_LOG_FORMAT")
   
   COLORED_LOG = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
   
   COLORED_FORMATTER_CLASS = conf.get("logging", "COLORED_FORMATTER_CLASS")
   
   BASE_LOG_FOLDER = conf.get("logging", "BASE_LOG_FOLDER")
   
   PROCESSOR_LOG_FOLDER = conf.get("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
   
   DAG_PROCESSOR_MANAGER_LOG_LOCATION = conf.get(
       "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
   )
   
   FILENAME_TEMPLATE = conf.get("logging", "LOG_FILENAME_TEMPLATE")
   
   PROCESSOR_FILENAME_TEMPLATE = conf.get("logging", 
"LOG_PROCESSOR_FILENAME_TEMPLATE")
   
   FORMATTER_CLASS_KEY = "()" if six.PY2 else "class"
   
   DEFAULT_LOGGING_CONFIG = {
       "version": 1,
       "disable_existing_loggers": False,
       "formatters": {
           "airflow": {"format": LOG_FORMAT},
           "airflow_coloured": {
               "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
               FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS
               if COLORED_LOG
               else "logging.Formatter",
           },
       },
       "handlers": {
           "console": {
               "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
               "formatter": "airflow_coloured",
               "stream": "sys.stdout",
           },
           "task": {
               "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
               "formatter": "airflow",
               "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
               "filename_template": FILENAME_TEMPLATE,
           },
           "processor": {
               "class": 
"airflow.utils.log.file_processor_handler.FileProcessorHandler",
               "formatter": "airflow",
               "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
               "filename_template": PROCESSOR_FILENAME_TEMPLATE,
           },
       },
       "loggers": {
           "airflow.processor": {
               "handlers": ["processor"],
               "level": LOG_LEVEL,
               "propagate": False,
           },
           "airflow.task": {
               "handlers": ["task"],
               "level": LOG_LEVEL,
               "propagate": False,
           },
           "flask_appbuilder": {
               "handler": ["console"],
               "level": FAB_LOG_LEVEL,
               "propagate": True,
           },
       },
       "root": {
           "handlers": ["console"],
           "level": LOG_LEVEL,
       },
   }  # type: Dict[str, Any]
   
   DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
       "handlers": {
           "processor_manager": {
               "class": "logging.handlers.RotatingFileHandler",
               "formatter": "airflow",
               "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
               "mode": "a",
               "maxBytes": 104857600,  # 100MB
               "backupCount": 5,
           }
       },
       "loggers": {
           "airflow.processor_manager": {
               "handlers": ["processor_manager"],
               "level": LOG_LEVEL,
               "propagate": False,
           }
       },
   }
   
   # Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER 
is set.
   # This is to avoid exceptions when initializing RotatingFileHandler multiple 
times
   # in multiple processes.
   if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
       DEFAULT_LOGGING_CONFIG["handlers"].update(
           DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]
       )
       DEFAULT_LOGGING_CONFIG["loggers"].update(
           DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]
       )
   
       # Manually create log directory for processor_manager handler as 
RotatingFileHandler
       # will only create file but not the directory.
       processor_manager_handler_config = 
DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
           "processor_manager"
       ]
       directory = os.path.dirname(processor_manager_handler_config["filename"])
       mkdirs(directory, 0o755)
   
   ##################
   # Remote logging #
   ##################
   
   REMOTE_LOGGING = conf.getboolean("logging", "remote_logging")
   
   if REMOTE_LOGGING:
   
       ELASTICSEARCH_HOST = conf.get("elasticsearch", "HOST")
   
       # Storage bucket URL for remote logging
       # S3 buckets should start with "s3://"
       # GCS buckets should start with "gs://"
       # WASB buckets should start with "wasb"
       # just to help Airflow select correct handler
       REMOTE_BASE_LOG_FOLDER = conf.get("logging", "REMOTE_BASE_LOG_FOLDER")
   
       if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
           S3_REMOTE_HANDLERS = {
               "task": {
                   "class": "airflow.utils.log.s3_task_handler.S3TaskHandler",
                   "formatter": "airflow",
                   "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
                   "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
                   "filename_template": FILENAME_TEMPLATE,
               },
           }
   
           DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
       elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
           GCS_REMOTE_HANDLERS = {
               "task": {
                   "class": "airflow.utils.log.gcs_task_handler.GCSTaskHandler",
                   "formatter": "airflow",
                   "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
                   "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
                   "filename_template": FILENAME_TEMPLATE,
               },
           }
   
           DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
       elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
           WASB_REMOTE_HANDLERS = {
               "task": {
                   "class": 
"airflow.utils.log.wasb_task_handler.WasbTaskHandler",
                   "formatter": "airflow",
                   "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
                   "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
                   "wasb_container": "airflow-logs",
                   "filename_template": FILENAME_TEMPLATE,
                   "delete_local_copy": False,
               },
           }
   
           DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
       elif ELASTICSEARCH_HOST:
           ELASTICSEARCH_LOG_ID_TEMPLATE = conf.get("elasticsearch", 
"LOG_ID_TEMPLATE")
           ELASTICSEARCH_END_OF_LOG_MARK = conf.get("elasticsearch", 
"END_OF_LOG_MARK")
           ELASTICSEARCH_WRITE_STDOUT = conf.getboolean("elasticsearch", 
"WRITE_STDOUT")
           ELASTICSEARCH_JSON_FORMAT = conf.getboolean("elasticsearch", 
"JSON_FORMAT")
           ELASTICSEARCH_JSON_FIELDS = conf.get("elasticsearch", "JSON_FIELDS")
   
           ELASTIC_REMOTE_HANDLERS = {
               "task": {
                   "class": 
"airflow.utils.log.es_task_handler.ElasticsearchTaskHandler",
                   "formatter": "airflow",
                   "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
                   "log_id_template": ELASTICSEARCH_LOG_ID_TEMPLATE,
                   "filename_template": FILENAME_TEMPLATE,
                   "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
                   "host": ELASTICSEARCH_HOST,
                   "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
                   "json_format": ELASTICSEARCH_JSON_FORMAT,
                   "json_fields": ELASTICSEARCH_JSON_FIELDS,
               },
           }
   
           DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
       else:
           raise AirflowException(
               "Incorrect remote log configuration. Please check the 
configuration of option 'host' in "
               "section 'elasticsearch' if you are using Elasticsearch. In the 
other case, "
               "'remote_base_log_folder' option in 'core' section."
           )
   
   
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to